diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 71ec8497..bad7e304 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -1,5 +1,5 @@ use std::{ - collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet}, + collections::{BTreeMap, BTreeSet, HashMap, HashSet}, sync::Arc, time::Duration, }; @@ -23,11 +23,9 @@ use ruma::{ room::member::{MembershipState, RoomMemberEventContent}, StateEventType, TimelineEventType, }, - uint, DeviceId, EventId, JsOption, OwnedDeviceId, OwnedUserId, RoomId, - UInt, UserId, + uint, DeviceId, EventId, JsOption, OwnedUserId, RoomId, UInt, UserId, }; -use tokio::sync::watch::Sender; -use tracing::{debug, error, info}; +use tracing::{debug, error}; use crate::{ service::{pdu::EventHash, rooms::timeline::PduCount}, @@ -73,10 +71,7 @@ use crate::{ /// For left rooms: /// - If the user left after `since`: `prev_batch` token, empty state (TODO: /// subset of the state at the point of the leave) -/// -/// - Sync is handled in an async task, multiple requests from the same device -/// with the same -/// `since` will be cached +#[allow(clippy::too_many_lines)] pub(crate) async fn sync_events_route( body: Ruma, ) -> Result> { @@ -84,109 +79,6 @@ pub(crate) async fn sync_events_route( let sender_device = body.sender_device.expect("user is authenticated"); let body = body.body; - let mut rx = match services() - .globals - .sync_receivers - .write() - .await - .entry((sender_user.clone(), sender_device.clone())) - { - Entry::Vacant(v) => { - let (tx, rx) = tokio::sync::watch::channel(None); - - v.insert((body.since.clone(), rx.clone())); - - tokio::spawn(sync_helper_wrapper( - sender_user.clone(), - sender_device.clone(), - body, - tx, - )); - - rx - } - Entry::Occupied(mut o) => { - if o.get().0 == body.since { - o.get().1.clone() - } else { - let (tx, rx) = tokio::sync::watch::channel(None); - - o.insert((body.since.clone(), rx.clone())); - - info!("Sync started for {sender_user}"); - - tokio::spawn(sync_helper_wrapper( - sender_user.clone(), - sender_device.clone(), - body, - tx, - )); - - rx - } - } - }; - - let we_have_to_wait = rx.borrow().is_none(); - if we_have_to_wait { - if let Err(e) = rx.changed().await { - error!("Error waiting for sync: {}", e); - } - } - - let result = match rx - .borrow() - .as_ref() - .expect("When sync channel changes it's always set to some") - { - Ok(response) => Ok(response.clone()), - Err(error) => Err(error.to_response()), - }; - - result -} - -async fn sync_helper_wrapper( - sender_user: OwnedUserId, - sender_device: OwnedDeviceId, - body: sync_events::v3::Request, - tx: Sender>>, -) { - let since = body.since.clone(); - - let r = sync_helper(sender_user.clone(), sender_device.clone(), body).await; - - if let Ok((_, caching_allowed)) = r { - if !caching_allowed { - match services() - .globals - .sync_receivers - .write() - .await - .entry((sender_user, sender_device)) - { - Entry::Occupied(o) => { - // Only remove if the device didn't start a different /sync - // already - if o.get().0 == since { - o.remove(); - } - } - Entry::Vacant(_) => {} - } - } - } - - tx.send(Some(r.map(|(r, _)| r))).expect("receiver should not be dropped"); -} - -#[allow(clippy::too_many_lines)] -async fn sync_helper( - sender_user: OwnedUserId, - sender_device: OwnedDeviceId, - body: sync_events::v3::Request, - // bool = caching allowed -) -> Result<(sync_events::v3::Response, bool), Error> { // Setup watchers, so if there's no response, we can wait for them let watcher = services().globals.watch(&sender_user, &sender_device); @@ -565,10 +457,10 @@ async fn sync_helper( Ok(x) => x.expect("watcher should succeed"), Err(error) => debug!(%error, "timed out"), }; - Ok((response, false)) + Ok(response) } else { // Only cache if we made progress - Ok((response, since != next_batch)) + Ok(response) } } diff --git a/src/service/globals.rs b/src/service/globals.rs index 87ad973e..a97f08a9 100644 --- a/src/service/globals.rs +++ b/src/service/globals.rs @@ -23,15 +23,12 @@ use hyper::{ }; use reqwest::dns::{Addrs, Resolve, Resolving}; use ruma::{ - api::{ - client::sync::sync_events, - federation::discovery::{ServerSigningKeys, VerifyKey}, - }, + api::federation::discovery::{ServerSigningKeys, VerifyKey}, serde::Base64, - DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedServerName, - OwnedServerSigningKeyId, OwnedUserId, RoomVersionId, ServerName, UserId, + DeviceId, OwnedEventId, OwnedRoomId, OwnedServerName, + OwnedServerSigningKeyId, RoomVersionId, ServerName, UserId, }; -use tokio::sync::{broadcast, watch::Receiver, Mutex, RwLock, Semaphore}; +use tokio::sync::{broadcast, Mutex, RwLock, Semaphore}; use tracing::{error, info}; use trust_dns_resolver::TokioAsyncResolver; @@ -41,12 +38,6 @@ type WellKnownMap = HashMap; type TlsNameMap = HashMap, u16)>; // Time if last failed try, number of failed tries type RateLimitState = (Instant, u32); -type SyncHandle = ( - // since - Option, - // rx - Receiver>>, -); pub(crate) struct Service { pub(crate) db: &'static dyn Data, @@ -70,8 +61,6 @@ pub(crate) struct Service { Arc>>, pub(crate) servername_ratelimiter: Arc>>>, - pub(crate) sync_receivers: - RwLock>, pub(crate) roomid_mutex_insert: RwLock>>>, pub(crate) roomid_mutex_state: RwLock>>>, @@ -237,7 +226,6 @@ impl Service { roomid_mutex_federation: RwLock::new(HashMap::new()), roomid_federationhandletime: RwLock::new(HashMap::new()), stateres_mutex: Arc::new(Mutex::new(())), - sync_receivers: RwLock::new(HashMap::new()), rotate: RotationHandler::new(), shutdown: AtomicBool::new(false), };