service/sending: factor out closures into methods

This commit is contained in:
Lambda 2024-05-19 18:06:13 +00:00
parent 092315e2cd
commit 5e9e5b76bc

View file

@ -108,6 +108,14 @@ enum TransactionStatus {
Retrying(u32), Retrying(u32),
} }
struct HandlerInputs {
kind: OutgoingKind,
events: Vec<SendingEventType>,
}
type HandlerResponse = Result<OutgoingKind, (OutgoingKind, Error)>;
type TransactionStatusMap = HashMap<OutgoingKind, TransactionStatus>;
impl Service { impl Service {
pub(crate) fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> { pub(crate) fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
let (sender, receiver) = mpsc::unbounded_channel(); let (sender, receiver) = mpsc::unbounded_channel();
@ -128,14 +136,12 @@ impl Service {
}); });
} }
#[allow(clippy::too_many_lines)]
async fn handler(&self) -> Result<()> { async fn handler(&self) -> Result<()> {
let mut receiver = self.receiver.lock().await; let mut receiver = self.receiver.lock().await;
let mut futures = FuturesUnordered::new(); let mut futures = FuturesUnordered::new();
let mut current_transaction_status = let mut current_transaction_status = TransactionStatusMap::new();
HashMap::<OutgoingKind, TransactionStatus>::new();
// Retry requests we could not finish yet // Retry requests we could not finish yet
let mut initial_transactions = let mut initial_transactions =
@ -165,14 +171,40 @@ impl Service {
futures.push(Self::handle_events(outgoing_kind.clone(), events)); futures.push(Self::handle_events(outgoing_kind.clone(), events));
} }
let handle_futures = loop {
|response, select! {
current_transaction_status: &mut HashMap<_, _>, Some(response) = futures.next() =>
futures: &mut FuturesUnordered<_>| { if let Some(HandlerInputs { kind, events }) =
self.handle_futures(
response,
&mut current_transaction_status,
)?
{
futures.push(Self::handle_events(kind, events));
},
Some((outgoing_kind, event, key)) = receiver.recv() =>
if let Some(HandlerInputs { kind, events }) =
self.handle_receiver(
outgoing_kind,
event,
key,
&mut current_transaction_status,
)
{
futures.push(Self::handle_events(kind, events));
}
}
}
}
fn handle_futures(
&self,
response: HandlerResponse,
current_transaction_status: &mut TransactionStatusMap,
) -> Result<Option<HandlerInputs>> {
match response { match response {
Ok(outgoing_kind) => { Ok(outgoing_kind) => {
self.db self.db.delete_all_active_requests_for(&outgoing_kind)?;
.delete_all_active_requests_for(&outgoing_kind)?;
// Find events that have been added since starting the // Find events that have been added since starting the
// last request // last request
@ -185,29 +217,26 @@ impl Service {
if new_events.is_empty() { if new_events.is_empty() {
current_transaction_status.remove(&outgoing_kind); current_transaction_status.remove(&outgoing_kind);
Ok(None)
} else { } else {
// Insert pdus we found // Insert pdus we found
self.db.mark_as_active(&new_events)?; self.db.mark_as_active(&new_events)?;
futures.push(Self::handle_events( Ok(Some(HandlerInputs {
outgoing_kind.clone(), kind: outgoing_kind.clone(),
new_events events: new_events
.into_iter() .into_iter()
.map(|(event, _)| event) .map(|(event, _)| event)
.collect(), .collect(),
)); }))
} }
} }
Err((outgoing_kind, _)) => { Err((outgoing_kind, _)) => {
current_transaction_status current_transaction_status.entry(outgoing_kind).and_modify(
.entry(outgoing_kind) |e| {
.and_modify(|e| {
*e = match e { *e = match e {
TransactionStatus::Running => { TransactionStatus::Running => {
TransactionStatus::Failed( TransactionStatus::Failed(1, Instant::now())
1,
Instant::now(),
)
} }
TransactionStatus::Retrying(n) => { TransactionStatus::Retrying(n) => {
TransactionStatus::Failed( TransactionStatus::Failed(
@ -217,51 +246,37 @@ impl Service {
} }
TransactionStatus::Failed(..) => { TransactionStatus::Failed(..) => {
error!( error!(
"Request that was not even \ "Request that was not even running \
running failed?!" failed?!"
); );
return; return;
} }
} }
}); },
);
Ok(None)
}
}
} }
};
Result::<_>::Ok(()) fn handle_receiver(
}; &self,
outgoing_kind: OutgoingKind,
let handle_receiver = event: SendingEventType,
|outgoing_kind, key: Vec<u8>,
event, current_transaction_status: &mut TransactionStatusMap,
key, ) -> Option<HandlerInputs> {
current_transaction_status: &mut HashMap<_, _>,
futures: &mut FuturesUnordered<_>| {
if let Ok(Some(events)) = self.select_events( if let Ok(Some(events)) = self.select_events(
&outgoing_kind, &outgoing_kind,
vec![(event, key)], vec![(event, key)],
current_transaction_status, current_transaction_status,
) { ) {
futures.push(Self::handle_events(outgoing_kind, events)); Some(HandlerInputs {
} kind: outgoing_kind,
}; events,
})
loop { } else {
select! { None
Some(response) = futures.next() =>
handle_futures(
response,
&mut current_transaction_status,
&mut futures,
)?,
Some((outgoing_kind, event, key)) = receiver.recv() =>
handle_receiver(
outgoing_kind,
event,
key,
&mut current_transaction_status,
&mut futures,
),
}
} }
} }
@ -566,7 +581,7 @@ impl Service {
async fn handle_events( async fn handle_events(
kind: OutgoingKind, kind: OutgoingKind,
events: Vec<SendingEventType>, events: Vec<SendingEventType>,
) -> Result<OutgoingKind, (OutgoingKind, Error)> { ) -> HandlerResponse {
match &kind { match &kind {
OutgoingKind::Appservice(id) => { OutgoingKind::Appservice(id) => {
let mut pdu_jsons = Vec::new(); let mut pdu_jsons = Vec::new();