diff --git a/src/database/key_value/sending.rs b/src/database/key_value/sending.rs index a03b2596..10d36ad3 100644 --- a/src/database/key_value/sending.rs +++ b/src/database/key_value/sending.rs @@ -4,7 +4,7 @@ use crate::{ database::KeyValueDatabase, service::{ self, - sending::{OutgoingKind, RequestKey, SendingEventType}, + sending::{Destination, RequestKey, SendingEventType}, }, services, utils, Error, Result, }; @@ -13,9 +13,8 @@ impl service::sending::Data for KeyValueDatabase { fn active_requests<'a>( &'a self, ) -> Box< - dyn Iterator< - Item = Result<(RequestKey, OutgoingKind, SendingEventType)>, - > + 'a, + dyn Iterator> + + 'a, > { Box::new(self.servercurrentevent_data.iter().map(|(key, v)| { let key = RequestKey::new(key); @@ -25,10 +24,10 @@ impl service::sending::Data for KeyValueDatabase { fn active_requests_for<'a>( &'a self, - outgoing_kind: &OutgoingKind, + destination: &Destination, ) -> Box> + 'a> { - let prefix = outgoing_kind.get_prefix(); + let prefix = destination.get_prefix(); Box::new(self.servercurrentevent_data.scan_prefix(prefix).map( |(key, v)| { let key = RequestKey::new(key); @@ -43,9 +42,9 @@ impl service::sending::Data for KeyValueDatabase { fn delete_all_active_requests_for( &self, - outgoing_kind: &OutgoingKind, + destination: &Destination, ) -> Result<()> { - let prefix = outgoing_kind.get_prefix(); + let prefix = destination.get_prefix(); for (key, _) in self.servercurrentevent_data.scan_prefix(prefix) { self.servercurrentevent_data.remove(&key)?; } @@ -55,12 +54,12 @@ impl service::sending::Data for KeyValueDatabase { fn queue_requests( &self, - requests: &[(&OutgoingKind, SendingEventType)], + requests: &[(&Destination, SendingEventType)], ) -> Result> { let mut batch = Vec::new(); let mut keys = Vec::new(); - for (outgoing_kind, event) in requests { - let mut key = outgoing_kind.get_prefix(); + for (destination, event) in requests { + let mut key = destination.get_prefix(); if let SendingEventType::Pdu(value) = &event { key.extend_from_slice(value); } else { @@ -82,10 +81,10 @@ impl service::sending::Data for KeyValueDatabase { fn queued_requests<'a>( &'a self, - outgoing_kind: &OutgoingKind, + destination: &Destination, ) -> Box> + 'a> { - let prefix = outgoing_kind.get_prefix(); + let prefix = destination.get_prefix(); return Box::new(self.servernameevent_data.scan_prefix(prefix).map( |(k, v)| { let k = RequestKey::new(k); @@ -136,7 +135,7 @@ impl service::sending::Data for KeyValueDatabase { fn parse_servercurrentevent( key: &RequestKey, value: Vec, -) -> Result<(OutgoingKind, SendingEventType)> { +) -> Result<(Destination, SendingEventType)> { let key = key.as_bytes(); // Appservices start with a plus Ok::<_, Error>(if key.starts_with(b"+") { @@ -154,7 +153,7 @@ fn parse_servercurrentevent( })?; ( - OutgoingKind::Appservice(server), + Destination::Appservice(server), if value.is_empty() { SendingEventType::Pdu(event.to_vec()) } else { @@ -185,7 +184,7 @@ fn parse_servercurrentevent( })?; ( - OutgoingKind::Push(user_id, pushkey_string), + Destination::Push(user_id, pushkey_string), if value.is_empty() { SendingEventType::Pdu(event.to_vec()) } else { @@ -208,7 +207,7 @@ fn parse_servercurrentevent( })?; ( - OutgoingKind::Normal(ServerName::parse(server).map_err(|_| { + Destination::Normal(ServerName::parse(server).map_err(|_| { Error::bad_database( "Invalid server string in server_currenttransaction", ) diff --git a/src/service/sending.rs b/src/service/sending.rs index 6e1c03a7..ec4768bf 100644 --- a/src/service/sending.rs +++ b/src/service/sending.rs @@ -45,30 +45,30 @@ use crate::{ }; #[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub(crate) enum OutgoingKind { +pub(crate) enum Destination { Appservice(String), // user and pushkey Push(OwnedUserId, String), Normal(OwnedServerName), } -impl OutgoingKind { +impl Destination { #[tracing::instrument(skip(self))] pub(crate) fn get_prefix(&self) -> Vec { let mut prefix = match self { - OutgoingKind::Appservice(server) => { + Destination::Appservice(server) => { let mut p = b"+".to_vec(); p.extend_from_slice(server.as_bytes()); p } - OutgoingKind::Push(user, pushkey) => { + Destination::Push(user, pushkey) => { let mut p = b"$".to_vec(); p.extend_from_slice(user.as_bytes()); p.push(0xFF); p.extend_from_slice(pushkey.as_bytes()); p } - OutgoingKind::Normal(server) => { + Destination::Normal(server) => { let mut p = Vec::new(); p.extend_from_slice(server.as_bytes()); p @@ -102,7 +102,7 @@ impl RequestKey { } pub(crate) struct RequestData { - outgoing_kind: OutgoingKind, + destination: Destination, event_type: SendingEventType, key: RequestKey, } @@ -126,18 +126,18 @@ enum TransactionStatus { } struct HandlerInputs { - kind: OutgoingKind, + destination: Destination, events: Vec, } -type HandlerResponse = Result; +type HandlerResponse = Result; -fn outgoing_kind_from_response(response: &HandlerResponse) -> &OutgoingKind { +fn destination_from_response(response: &HandlerResponse) -> &Destination { match response { Ok(kind) | Err((kind, _)) => kind, } } -type TransactionStatusMap = HashMap; +type TransactionStatusMap = HashMap; impl Service { pub(crate) fn build(db: &'static dyn Data, config: &Config) -> Arc { @@ -168,18 +168,18 @@ impl Service { // Retry requests we could not finish yet let mut initial_transactions = - HashMap::>::new(); + HashMap::>::new(); - for (key, outgoing_kind, event) in + for (key, destination, event) in self.db.active_requests().filter_map(Result::ok) { let entry = - initial_transactions.entry(outgoing_kind.clone()).or_default(); + initial_transactions.entry(destination.clone()).or_default(); if entry.len() > 30 { warn!( "Dropping some current events: {:?} {:?} {:?}", - key, outgoing_kind, event + key, destination, event ); self.db.delete_active_request(key)?; continue; @@ -188,33 +188,33 @@ impl Service { entry.push(event); } - for (outgoing_kind, events) in initial_transactions { + for (destination, events) in initial_transactions { current_transaction_status - .insert(outgoing_kind.clone(), TransactionStatus::Running); - futures.push(handle_events(outgoing_kind.clone(), events)); + .insert(destination.clone(), TransactionStatus::Running); + futures.push(handle_events(destination.clone(), events)); } loop { select! { Some(response) = futures.next() => { if let Some(HandlerInputs { - kind, + destination, events, }) = self.handle_response( response, &mut current_transaction_status, )? { - futures.push(handle_events(kind, events)); + futures.push(handle_events(destination, events)); } } Some(data) = receiver.recv() => { if let Some(HandlerInputs { - kind, + destination, events, }) = self .handle_receiver(data, &mut current_transaction_status) { - futures.push(handle_events(kind, events)); + futures.push(handle_events(destination, events)); } } } @@ -225,7 +225,7 @@ impl Service { skip(self, current_transaction_status), fields( current_status = ?current_transaction_status.get( - outgoing_kind_from_response(&response) + destination_from_response(&response) ), ), )] @@ -235,27 +235,27 @@ impl Service { current_transaction_status: &mut TransactionStatusMap, ) -> Result> { match response { - Ok(outgoing_kind) => { - self.db.delete_all_active_requests_for(&outgoing_kind)?; + Ok(destination) => { + self.db.delete_all_active_requests_for(&destination)?; // Find events that have been added since starting the // last request let new_events = self .db - .queued_requests(&outgoing_kind) + .queued_requests(&destination) .filter_map(Result::ok) .take(30) .collect::>(); if new_events.is_empty() { - current_transaction_status.remove(&outgoing_kind); + current_transaction_status.remove(&destination); Ok(None) } else { // Insert pdus we found self.db.mark_as_active(&new_events)?; Ok(Some(HandlerInputs { - kind: outgoing_kind.clone(), + destination: destination.clone(), events: new_events .into_iter() .map(|(event, _)| event) @@ -263,29 +263,23 @@ impl Service { })) } } - Err((outgoing_kind, _)) => { - current_transaction_status.entry(outgoing_kind).and_modify( - |e| { - *e = match e { - TransactionStatus::Running => { - TransactionStatus::Failed(1, Instant::now()) - } - TransactionStatus::Retrying(n) => { - TransactionStatus::Failed( - *n + 1, - Instant::now(), - ) - } - TransactionStatus::Failed(..) => { - error!( - "Request that was not even running \ - failed?!" - ); - return; - } + Err((destination, _)) => { + current_transaction_status.entry(destination).and_modify(|e| { + *e = match e { + TransactionStatus::Running => { + TransactionStatus::Failed(1, Instant::now()) } - }, - ); + TransactionStatus::Retrying(n) => { + TransactionStatus::Failed(*n + 1, Instant::now()) + } + TransactionStatus::Failed(..) => { + error!( + "Request that was not even running failed?!" + ); + return; + } + } + }); Ok(None) } } @@ -294,25 +288,25 @@ impl Service { #[tracing::instrument( skip(self, event_type, key, current_transaction_status), fields( - current_status = ?current_transaction_status.get(&outgoing_kind), + current_status = ?current_transaction_status.get(&destination), ), )] fn handle_receiver( &self, RequestData { - outgoing_kind, + destination, event_type, key, }: RequestData, current_transaction_status: &mut TransactionStatusMap, ) -> Option { if let Ok(Some(events)) = self.select_events( - &outgoing_kind, + &destination, vec![(event_type, key)], current_transaction_status, ) { Some(HandlerInputs { - kind: outgoing_kind, + destination, events, }) } else { @@ -324,23 +318,23 @@ impl Service { skip(self, new_events, current_transaction_status), fields( new_events = debug_slice_truncated(&new_events, 3), - current_status = ?current_transaction_status.get(outgoing_kind), + current_status = ?current_transaction_status.get(destination), ), )] fn select_events( &self, - outgoing_kind: &OutgoingKind, + destination: &Destination, // Events we want to send: event and full key new_events: Vec<(SendingEventType, RequestKey)>, current_transaction_status: &mut HashMap< - OutgoingKind, + Destination, TransactionStatus, >, ) -> Result>> { let mut retry = false; let mut allow = true; - let entry = current_transaction_status.entry(outgoing_kind.clone()); + let entry = current_transaction_status.entry(destination.clone()); entry .and_modify(|e| match e { @@ -377,10 +371,8 @@ impl Service { if retry { // We retry the previous transaction - for (_, e) in self - .db - .active_requests_for(outgoing_kind) - .filter_map(Result::ok) + for (_, e) in + self.db.active_requests_for(destination).filter_map(Result::ok) { events.push(e); } @@ -390,7 +382,7 @@ impl Service { events.push(e); } - if let OutgoingKind::Normal(server_name) = outgoing_kind { + if let Destination::Normal(server_name) = destination { if let Ok((select_edus, last_count)) = self.select_edus(server_name) { @@ -536,13 +528,13 @@ impl Service { user: &UserId, pushkey: String, ) -> Result<()> { - let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey); + let destination = Destination::Push(user.to_owned(), pushkey); let event_type = SendingEventType::Pdu(pdu_id.to_owned()); let keys = - self.db.queue_requests(&[(&outgoing_kind, event_type.clone())])?; + self.db.queue_requests(&[(&destination, event_type.clone())])?; self.sender .send(RequestData { - outgoing_kind, + destination, event_type, key: keys.into_iter().next().unwrap(), }) @@ -561,7 +553,7 @@ impl Service { .into_iter() .map(|server| { ( - OutgoingKind::Normal(server), + Destination::Normal(server), SendingEventType::Pdu(pdu_id.to_owned()), ) }) @@ -569,11 +561,10 @@ impl Service { let keys = self.db.queue_requests( &requests.iter().map(|(o, e)| (o, e.clone())).collect::>(), )?; - for ((outgoing_kind, event_type), key) in requests.into_iter().zip(keys) - { + for ((destination, event_type), key) in requests.into_iter().zip(keys) { self.sender .send(RequestData { - outgoing_kind: outgoing_kind.clone(), + destination: destination.clone(), event_type, key, }) @@ -590,13 +581,13 @@ impl Service { serialized: Vec, id: u64, ) -> Result<()> { - let outgoing_kind = OutgoingKind::Normal(server.to_owned()); + let destination = Destination::Normal(server.to_owned()); let event_type = SendingEventType::Edu(serialized); let keys = - self.db.queue_requests(&[(&outgoing_kind, event_type.clone())])?; + self.db.queue_requests(&[(&destination, event_type.clone())])?; self.sender .send(RequestData { - outgoing_kind, + destination, event_type, key: keys.into_iter().next().unwrap(), }) @@ -611,13 +602,13 @@ impl Service { appservice_id: String, pdu_id: Vec, ) -> Result<()> { - let outgoing_kind = OutgoingKind::Appservice(appservice_id); + let destination = Destination::Appservice(appservice_id); let event_type = SendingEventType::Pdu(pdu_id); let keys = - self.db.queue_requests(&[(&outgoing_kind, event_type.clone())])?; + self.db.queue_requests(&[(&destination, event_type.clone())])?; self.sender .send(RequestData { - outgoing_kind, + destination, event_type, key: keys.into_iter().next().unwrap(), }) @@ -892,23 +883,23 @@ async fn handle_federation_event( #[tracing::instrument(skip_all)] async fn handle_events( - kind: OutgoingKind, + destination: Destination, events: Vec, ) -> HandlerResponse { - let ret = match &kind { - OutgoingKind::Appservice(id) => { + let ret = match &destination { + Destination::Appservice(id) => { handle_appservice_event(id, events).await } - OutgoingKind::Push(userid, pushkey) => { + Destination::Push(userid, pushkey) => { handle_push_event(userid, pushkey, events).await } - OutgoingKind::Normal(server) => { + Destination::Normal(server) => { handle_federation_event(server, events).await } }; match ret { - Ok(()) => Ok(kind), - Err(e) => Err((kind, e)), + Ok(()) => Ok(destination), + Err(e) => Err((destination, e)), } } diff --git a/src/service/sending/data.rs b/src/service/sending/data.rs index ad9915e5..00795faf 100644 --- a/src/service/sending/data.rs +++ b/src/service/sending/data.rs @@ -1,6 +1,6 @@ use ruma::ServerName; -use super::{OutgoingKind, RequestKey, SendingEventType}; +use super::{Destination, RequestKey, SendingEventType}; use crate::Result; pub(crate) trait Data: Send + Sync { @@ -8,26 +8,25 @@ pub(crate) trait Data: Send + Sync { fn active_requests<'a>( &'a self, ) -> Box< - dyn Iterator< - Item = Result<(RequestKey, OutgoingKind, SendingEventType)>, - > + 'a, + dyn Iterator> + + 'a, >; fn active_requests_for<'a>( &'a self, - outgoing_kind: &OutgoingKind, + destination: &Destination, ) -> Box> + 'a>; fn delete_active_request(&self, key: RequestKey) -> Result<()>; fn delete_all_active_requests_for( &self, - outgoing_kind: &OutgoingKind, + destination: &Destination, ) -> Result<()>; fn queue_requests( &self, - requests: &[(&OutgoingKind, SendingEventType)], + requests: &[(&Destination, SendingEventType)], ) -> Result>; fn queued_requests<'a>( &'a self, - outgoing_kind: &OutgoingKind, + destination: &Destination, ) -> Box> + 'a>; fn mark_as_active( &self,