From 5e9e5b76bc1534ece816f7c91c715bb3fa7c0b80 Mon Sep 17 00:00:00 2001 From: Lambda Date: Sun, 19 May 2024 18:06:13 +0000 Subject: [PATCH] service/sending: factor out closures into methods --- src/service/sending.rs | 207 ++++++++++++++++++++++------------------- 1 file changed, 111 insertions(+), 96 deletions(-) diff --git a/src/service/sending.rs b/src/service/sending.rs index 1cef2ab2..832a6f87 100644 --- a/src/service/sending.rs +++ b/src/service/sending.rs @@ -108,6 +108,14 @@ enum TransactionStatus { Retrying(u32), } +struct HandlerInputs { + kind: OutgoingKind, + events: Vec, +} +type HandlerResponse = Result; + +type TransactionStatusMap = HashMap; + impl Service { pub(crate) fn build(db: &'static dyn Data, config: &Config) -> Arc { let (sender, receiver) = mpsc::unbounded_channel(); @@ -128,14 +136,12 @@ impl Service { }); } - #[allow(clippy::too_many_lines)] async fn handler(&self) -> Result<()> { let mut receiver = self.receiver.lock().await; let mut futures = FuturesUnordered::new(); - let mut current_transaction_status = - HashMap::::new(); + let mut current_transaction_status = TransactionStatusMap::new(); // Retry requests we could not finish yet let mut initial_transactions = @@ -165,106 +171,115 @@ impl Service { futures.push(Self::handle_events(outgoing_kind.clone(), events)); } - let handle_futures = - |response, - current_transaction_status: &mut HashMap<_, _>, - futures: &mut FuturesUnordered<_>| { - match response { - Ok(outgoing_kind) => { - self.db - .delete_all_active_requests_for(&outgoing_kind)?; - - // Find events that have been added since starting the - // last request - let new_events = self - .db - .queued_requests(&outgoing_kind) - .filter_map(Result::ok) - .take(30) - .collect::>(); - - if new_events.is_empty() { - current_transaction_status.remove(&outgoing_kind); - } else { - // Insert pdus we found - self.db.mark_as_active(&new_events)?; - - futures.push(Self::handle_events( - outgoing_kind.clone(), - new_events - .into_iter() - .map(|(event, _)| event) - .collect(), - )); - } - } - Err((outgoing_kind, _)) => { - current_transaction_status - .entry(outgoing_kind) - .and_modify(|e| { - *e = match e { - TransactionStatus::Running => { - TransactionStatus::Failed( - 1, - Instant::now(), - ) - } - TransactionStatus::Retrying(n) => { - TransactionStatus::Failed( - *n + 1, - Instant::now(), - ) - } - TransactionStatus::Failed(..) => { - error!( - "Request that was not even \ - running failed?!" - ); - return; - } - } - }); - } - }; - - Result::<_>::Ok(()) - }; - - let handle_receiver = - |outgoing_kind, - event, - key, - current_transaction_status: &mut HashMap<_, _>, - futures: &mut FuturesUnordered<_>| { - if let Ok(Some(events)) = self.select_events( - &outgoing_kind, - vec![(event, key)], - current_transaction_status, - ) { - futures.push(Self::handle_events(outgoing_kind, events)); - } - }; - loop { select! { Some(response) = futures.next() => - handle_futures( - response, - &mut current_transaction_status, - &mut futures, - )?, + if let Some(HandlerInputs { kind, events }) = + self.handle_futures( + response, + &mut current_transaction_status, + )? + { + futures.push(Self::handle_events(kind, events)); + }, Some((outgoing_kind, event, key)) = receiver.recv() => - handle_receiver( - outgoing_kind, - event, - key, - &mut current_transaction_status, - &mut futures, - ), + if let Some(HandlerInputs { kind, events }) = + self.handle_receiver( + outgoing_kind, + event, + key, + &mut current_transaction_status, + ) + { + futures.push(Self::handle_events(kind, events)); + } } } } + fn handle_futures( + &self, + response: HandlerResponse, + current_transaction_status: &mut TransactionStatusMap, + ) -> Result> { + match response { + Ok(outgoing_kind) => { + self.db.delete_all_active_requests_for(&outgoing_kind)?; + + // Find events that have been added since starting the + // last request + let new_events = self + .db + .queued_requests(&outgoing_kind) + .filter_map(Result::ok) + .take(30) + .collect::>(); + + if new_events.is_empty() { + current_transaction_status.remove(&outgoing_kind); + Ok(None) + } else { + // Insert pdus we found + self.db.mark_as_active(&new_events)?; + + Ok(Some(HandlerInputs { + kind: outgoing_kind.clone(), + events: new_events + .into_iter() + .map(|(event, _)| event) + .collect(), + })) + } + } + Err((outgoing_kind, _)) => { + current_transaction_status.entry(outgoing_kind).and_modify( + |e| { + *e = match e { + TransactionStatus::Running => { + TransactionStatus::Failed(1, Instant::now()) + } + TransactionStatus::Retrying(n) => { + TransactionStatus::Failed( + *n + 1, + Instant::now(), + ) + } + TransactionStatus::Failed(..) => { + error!( + "Request that was not even running \ + failed?!" + ); + return; + } + } + }, + ); + Ok(None) + } + } + } + + fn handle_receiver( + &self, + outgoing_kind: OutgoingKind, + event: SendingEventType, + key: Vec, + current_transaction_status: &mut TransactionStatusMap, + ) -> Option { + if let Ok(Some(events)) = self.select_events( + &outgoing_kind, + vec![(event, key)], + current_transaction_status, + ) { + Some(HandlerInputs { + kind: outgoing_kind, + events, + }) + } else { + None + } + } + #[tracing::instrument(skip( self, outgoing_kind, @@ -566,7 +581,7 @@ impl Service { async fn handle_events( kind: OutgoingKind, events: Vec, - ) -> Result { + ) -> HandlerResponse { match &kind { OutgoingKind::Appservice(id) => { let mut pdu_jsons = Vec::new();