mirror of
https://gitlab.computer.surgery/matrix/grapevine.git
synced 2025-12-17 07:41:23 +01:00
remove sync response cache
This cache can serve invalid responses, and has an extremely low hit rate. It serves invalid responses because because it's only keyed off the `since` parameter, but many of the other request parameters also affect the response or it's side effects. This will become worse once we implement filtering, because there will be a wider space of parameters with different responses. This problem is fixable, but not worth it because of the low hit rate. The low hit rate is because normal clients will always issue the next sync request with `since` set to the `prev_batch` value of the previous response. The only time we expect to see multiple requests with the same `since` is when the response is empty, but we don't cache empty responses. This was confirmed experimentally by logging cache hits and misses over 15 minutes with a wide variety of clients. This test was run on matrix.computer.surgery, which has only a few active users, but a large volume of sync traffic from many rooms. Over the test period, we had 3 hits and 5309 misses. All hits occurred in the first minute, so I suspect that they had something to do with client recovery from an offline state. The clients that were connected during the test are: - element web - schildichat web - iamb - gomuks - nheko - fractal - fluffychat web - fluffychat android - cinny web - element android - element X android Fixes: #2
This commit is contained in:
parent
5cb2551422
commit
146465693e
2 changed files with 10 additions and 130 deletions
|
|
@ -1,5 +1,5 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
|
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
@ -23,11 +23,9 @@ use ruma::{
|
||||||
room::member::{MembershipState, RoomMemberEventContent},
|
room::member::{MembershipState, RoomMemberEventContent},
|
||||||
StateEventType, TimelineEventType,
|
StateEventType, TimelineEventType,
|
||||||
},
|
},
|
||||||
uint, DeviceId, EventId, JsOption, OwnedDeviceId, OwnedUserId, RoomId,
|
uint, DeviceId, EventId, JsOption, OwnedUserId, RoomId, UInt, UserId,
|
||||||
UInt, UserId,
|
|
||||||
};
|
};
|
||||||
use tokio::sync::watch::Sender;
|
use tracing::{debug, error};
|
||||||
use tracing::{debug, error, info};
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
service::{pdu::EventHash, rooms::timeline::PduCount},
|
service::{pdu::EventHash, rooms::timeline::PduCount},
|
||||||
|
|
@ -73,10 +71,7 @@ use crate::{
|
||||||
/// For left rooms:
|
/// For left rooms:
|
||||||
/// - If the user left after `since`: `prev_batch` token, empty state (TODO:
|
/// - If the user left after `since`: `prev_batch` token, empty state (TODO:
|
||||||
/// subset of the state at the point of the leave)
|
/// subset of the state at the point of the leave)
|
||||||
///
|
#[allow(clippy::too_many_lines)]
|
||||||
/// - Sync is handled in an async task, multiple requests from the same device
|
|
||||||
/// with the same
|
|
||||||
/// `since` will be cached
|
|
||||||
pub(crate) async fn sync_events_route(
|
pub(crate) async fn sync_events_route(
|
||||||
body: Ruma<sync_events::v3::Request>,
|
body: Ruma<sync_events::v3::Request>,
|
||||||
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
|
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
|
||||||
|
|
@ -84,109 +79,6 @@ pub(crate) async fn sync_events_route(
|
||||||
let sender_device = body.sender_device.expect("user is authenticated");
|
let sender_device = body.sender_device.expect("user is authenticated");
|
||||||
let body = body.body;
|
let body = body.body;
|
||||||
|
|
||||||
let mut rx = match services()
|
|
||||||
.globals
|
|
||||||
.sync_receivers
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.entry((sender_user.clone(), sender_device.clone()))
|
|
||||||
{
|
|
||||||
Entry::Vacant(v) => {
|
|
||||||
let (tx, rx) = tokio::sync::watch::channel(None);
|
|
||||||
|
|
||||||
v.insert((body.since.clone(), rx.clone()));
|
|
||||||
|
|
||||||
tokio::spawn(sync_helper_wrapper(
|
|
||||||
sender_user.clone(),
|
|
||||||
sender_device.clone(),
|
|
||||||
body,
|
|
||||||
tx,
|
|
||||||
));
|
|
||||||
|
|
||||||
rx
|
|
||||||
}
|
|
||||||
Entry::Occupied(mut o) => {
|
|
||||||
if o.get().0 == body.since {
|
|
||||||
o.get().1.clone()
|
|
||||||
} else {
|
|
||||||
let (tx, rx) = tokio::sync::watch::channel(None);
|
|
||||||
|
|
||||||
o.insert((body.since.clone(), rx.clone()));
|
|
||||||
|
|
||||||
info!("Sync started for {sender_user}");
|
|
||||||
|
|
||||||
tokio::spawn(sync_helper_wrapper(
|
|
||||||
sender_user.clone(),
|
|
||||||
sender_device.clone(),
|
|
||||||
body,
|
|
||||||
tx,
|
|
||||||
));
|
|
||||||
|
|
||||||
rx
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let we_have_to_wait = rx.borrow().is_none();
|
|
||||||
if we_have_to_wait {
|
|
||||||
if let Err(e) = rx.changed().await {
|
|
||||||
error!("Error waiting for sync: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let result = match rx
|
|
||||||
.borrow()
|
|
||||||
.as_ref()
|
|
||||||
.expect("When sync channel changes it's always set to some")
|
|
||||||
{
|
|
||||||
Ok(response) => Ok(response.clone()),
|
|
||||||
Err(error) => Err(error.to_response()),
|
|
||||||
};
|
|
||||||
|
|
||||||
result
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn sync_helper_wrapper(
|
|
||||||
sender_user: OwnedUserId,
|
|
||||||
sender_device: OwnedDeviceId,
|
|
||||||
body: sync_events::v3::Request,
|
|
||||||
tx: Sender<Option<Result<sync_events::v3::Response>>>,
|
|
||||||
) {
|
|
||||||
let since = body.since.clone();
|
|
||||||
|
|
||||||
let r = sync_helper(sender_user.clone(), sender_device.clone(), body).await;
|
|
||||||
|
|
||||||
if let Ok((_, caching_allowed)) = r {
|
|
||||||
if !caching_allowed {
|
|
||||||
match services()
|
|
||||||
.globals
|
|
||||||
.sync_receivers
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.entry((sender_user, sender_device))
|
|
||||||
{
|
|
||||||
Entry::Occupied(o) => {
|
|
||||||
// Only remove if the device didn't start a different /sync
|
|
||||||
// already
|
|
||||||
if o.get().0 == since {
|
|
||||||
o.remove();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Entry::Vacant(_) => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tx.send(Some(r.map(|(r, _)| r))).expect("receiver should not be dropped");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(clippy::too_many_lines)]
|
|
||||||
async fn sync_helper(
|
|
||||||
sender_user: OwnedUserId,
|
|
||||||
sender_device: OwnedDeviceId,
|
|
||||||
body: sync_events::v3::Request,
|
|
||||||
// bool = caching allowed
|
|
||||||
) -> Result<(sync_events::v3::Response, bool), Error> {
|
|
||||||
// Setup watchers, so if there's no response, we can wait for them
|
// Setup watchers, so if there's no response, we can wait for them
|
||||||
let watcher = services().globals.watch(&sender_user, &sender_device);
|
let watcher = services().globals.watch(&sender_user, &sender_device);
|
||||||
|
|
||||||
|
|
@ -565,10 +457,10 @@ async fn sync_helper(
|
||||||
Ok(x) => x.expect("watcher should succeed"),
|
Ok(x) => x.expect("watcher should succeed"),
|
||||||
Err(error) => debug!(%error, "timed out"),
|
Err(error) => debug!(%error, "timed out"),
|
||||||
};
|
};
|
||||||
Ok((response, false))
|
Ok(response)
|
||||||
} else {
|
} else {
|
||||||
// Only cache if we made progress
|
// Only cache if we made progress
|
||||||
Ok((response, since != next_batch))
|
Ok(response)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -23,15 +23,12 @@ use hyper::{
|
||||||
};
|
};
|
||||||
use reqwest::dns::{Addrs, Resolve, Resolving};
|
use reqwest::dns::{Addrs, Resolve, Resolving};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::{
|
api::federation::discovery::{ServerSigningKeys, VerifyKey},
|
||||||
client::sync::sync_events,
|
|
||||||
federation::discovery::{ServerSigningKeys, VerifyKey},
|
|
||||||
},
|
|
||||||
serde::Base64,
|
serde::Base64,
|
||||||
DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedServerName,
|
DeviceId, OwnedEventId, OwnedRoomId, OwnedServerName,
|
||||||
OwnedServerSigningKeyId, OwnedUserId, RoomVersionId, ServerName, UserId,
|
OwnedServerSigningKeyId, RoomVersionId, ServerName, UserId,
|
||||||
};
|
};
|
||||||
use tokio::sync::{broadcast, watch::Receiver, Mutex, RwLock, Semaphore};
|
use tokio::sync::{broadcast, Mutex, RwLock, Semaphore};
|
||||||
use tracing::{error, info};
|
use tracing::{error, info};
|
||||||
use trust_dns_resolver::TokioAsyncResolver;
|
use trust_dns_resolver::TokioAsyncResolver;
|
||||||
|
|
||||||
|
|
@ -41,12 +38,6 @@ type WellKnownMap = HashMap<OwnedServerName, (FedDest, String)>;
|
||||||
type TlsNameMap = HashMap<String, (Vec<IpAddr>, u16)>;
|
type TlsNameMap = HashMap<String, (Vec<IpAddr>, u16)>;
|
||||||
// Time if last failed try, number of failed tries
|
// Time if last failed try, number of failed tries
|
||||||
type RateLimitState = (Instant, u32);
|
type RateLimitState = (Instant, u32);
|
||||||
type SyncHandle = (
|
|
||||||
// since
|
|
||||||
Option<String>,
|
|
||||||
// rx
|
|
||||||
Receiver<Option<Result<sync_events::v3::Response>>>,
|
|
||||||
);
|
|
||||||
|
|
||||||
pub(crate) struct Service {
|
pub(crate) struct Service {
|
||||||
pub(crate) db: &'static dyn Data,
|
pub(crate) db: &'static dyn Data,
|
||||||
|
|
@ -70,8 +61,6 @@ pub(crate) struct Service {
|
||||||
Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>,
|
Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>,
|
||||||
pub(crate) servername_ratelimiter:
|
pub(crate) servername_ratelimiter:
|
||||||
Arc<RwLock<HashMap<OwnedServerName, Arc<Semaphore>>>>,
|
Arc<RwLock<HashMap<OwnedServerName, Arc<Semaphore>>>>,
|
||||||
pub(crate) sync_receivers:
|
|
||||||
RwLock<HashMap<(OwnedUserId, OwnedDeviceId), SyncHandle>>,
|
|
||||||
pub(crate) roomid_mutex_insert:
|
pub(crate) roomid_mutex_insert:
|
||||||
RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>,
|
RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>,
|
||||||
pub(crate) roomid_mutex_state: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>,
|
pub(crate) roomid_mutex_state: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>,
|
||||||
|
|
@ -237,7 +226,6 @@ impl Service {
|
||||||
roomid_mutex_federation: RwLock::new(HashMap::new()),
|
roomid_mutex_federation: RwLock::new(HashMap::new()),
|
||||||
roomid_federationhandletime: RwLock::new(HashMap::new()),
|
roomid_federationhandletime: RwLock::new(HashMap::new()),
|
||||||
stateres_mutex: Arc::new(Mutex::new(())),
|
stateres_mutex: Arc::new(Mutex::new(())),
|
||||||
sync_receivers: RwLock::new(HashMap::new()),
|
|
||||||
rotate: RotationHandler::new(),
|
rotate: RotationHandler::new(),
|
||||||
shutdown: AtomicBool::new(false),
|
shutdown: AtomicBool::new(false),
|
||||||
};
|
};
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue