From 6bc17b268c797a1d9979f70d29fbc224b14218cd Mon Sep 17 00:00:00 2001 From: Charles Hall Date: Thu, 16 May 2024 19:04:29 -0700 Subject: [PATCH] factor select bodies into closures I would've preferred to factor these out into their own functions, but unfortunately the inner type of the `FuturesUnordered` is unnameable. `Box` or TAIT would help, but the former has a performance cost and the latter doesn't exist on stable yet. --- src/service/sending.rs | 120 +++++++++++++++++++++++++++-------------- 1 file changed, 80 insertions(+), 40 deletions(-) diff --git a/src/service/sending.rs b/src/service/sending.rs index ebfaf379..22216faf 100644 --- a/src/service/sending.rs +++ b/src/service/sending.rs @@ -123,6 +123,7 @@ impl Service { }); } + #[allow(clippy::too_many_lines)] async fn handler(&self) -> Result<()> { let mut receiver = self.receiver.lock().await; @@ -155,51 +156,90 @@ impl Service { futures.push(Self::handle_events(outgoing_kind.clone(), events)); } - loop { - select! { - Some(response) = futures.next() => { - match response { - Ok(outgoing_kind) => { - self.db.delete_all_active_requests_for(&outgoing_kind)?; + let handle_futures = |response, + current_transaction_status: &mut HashMap<_, _>, + futures: &mut FuturesUnordered<_>| { + match response { + Ok(outgoing_kind) => { + self.db.delete_all_active_requests_for(&outgoing_kind)?; - // Find events that have been added since starting the last request - let new_events = self.db.queued_requests(&outgoing_kind).filter_map(Result::ok).take(30).collect::>(); + // Find events that have been added since starting the + // last request + let new_events = self + .db + .queued_requests(&outgoing_kind) + .filter_map(Result::ok) + .take(30) + .collect::>(); - if new_events.is_empty() { - current_transaction_status.remove(&outgoing_kind); - } else { - // Insert pdus we found - self.db.mark_as_active(&new_events)?; + if new_events.is_empty() { + current_transaction_status.remove(&outgoing_kind); + } else { + // Insert pdus we found + self.db.mark_as_active(&new_events)?; - futures.push( - Self::handle_events( - outgoing_kind.clone(), - new_events.into_iter().map(|(event, _)| event).collect(), - ) - ); - } - } - Err((outgoing_kind, _)) => { - current_transaction_status.entry(outgoing_kind).and_modify(|e| *e = match e { - TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), - TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), - TransactionStatus::Failed(_, _) => { - error!("Request that was not even running failed?!"); - return - }, - }); - } - }; - }, - Some((outgoing_kind, event, key)) = receiver.recv() => { - if let Ok(Some(events)) = self.select_events( - &outgoing_kind, - vec![(event, key)], - &mut current_transaction_status, - ) { - futures.push(Self::handle_events(outgoing_kind, events)); + futures.push(Self::handle_events( + outgoing_kind.clone(), + new_events.into_iter().map(|(event, _)| event).collect(), + )); } } + Err((outgoing_kind, _)) => { + current_transaction_status + .entry(outgoing_kind) + .and_modify(|e| { + *e = match e { + TransactionStatus::Running => { + TransactionStatus::Failed(1, Instant::now()) + } + TransactionStatus::Retrying(n) => { + TransactionStatus::Failed(*n + 1, Instant::now()) + } + TransactionStatus::Failed(..) => { + error!( + "Request that was not even \ + running failed?!" + ); + return; + } + } + }); + } + }; + + Result::<_>::Ok(()) + }; + + let handle_receiver = |outgoing_kind, + event, + key, + current_transaction_status: &mut HashMap<_, _>, + futures: &mut FuturesUnordered<_>| { + if let Ok(Some(events)) = self.select_events( + &outgoing_kind, + vec![(event, key)], + current_transaction_status, + ) { + futures.push(Self::handle_events(outgoing_kind, events)); + } + }; + + loop { + select! { + Some(response) = futures.next() => + handle_futures( + response, + &mut current_transaction_status, + &mut futures, + )?, + Some((outgoing_kind, event, key)) = receiver.recv() => + handle_receiver( + outgoing_kind, + event, + key, + &mut current_transaction_status, + &mut futures, + ), } } }