mirror of
https://gitlab.computer.surgery/matrix/grapevine.git
synced 2025-12-17 07:41:23 +01:00
factor select bodies into closures
I would've preferred to factor these out into their own functions, but unfortunately the inner type of the `FuturesUnordered` is unnameable. `Box` or TAIT would help, but the former has a performance cost and the latter doesn't exist on stable yet.
This commit is contained in:
parent
ac53948450
commit
6bc17b268c
1 changed files with 80 additions and 40 deletions
|
|
@ -123,6 +123,7 @@ impl Service {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_lines)]
|
||||||
async fn handler(&self) -> Result<()> {
|
async fn handler(&self) -> Result<()> {
|
||||||
let mut receiver = self.receiver.lock().await;
|
let mut receiver = self.receiver.lock().await;
|
||||||
|
|
||||||
|
|
@ -155,15 +156,21 @@ impl Service {
|
||||||
futures.push(Self::handle_events(outgoing_kind.clone(), events));
|
futures.push(Self::handle_events(outgoing_kind.clone(), events));
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
let handle_futures = |response,
|
||||||
select! {
|
current_transaction_status: &mut HashMap<_, _>,
|
||||||
Some(response) = futures.next() => {
|
futures: &mut FuturesUnordered<_>| {
|
||||||
match response {
|
match response {
|
||||||
Ok(outgoing_kind) => {
|
Ok(outgoing_kind) => {
|
||||||
self.db.delete_all_active_requests_for(&outgoing_kind)?;
|
self.db.delete_all_active_requests_for(&outgoing_kind)?;
|
||||||
|
|
||||||
// Find events that have been added since starting the last request
|
// Find events that have been added since starting the
|
||||||
let new_events = self.db.queued_requests(&outgoing_kind).filter_map(Result::ok).take(30).collect::<Vec<_>>();
|
// last request
|
||||||
|
let new_events = self
|
||||||
|
.db
|
||||||
|
.queued_requests(&outgoing_kind)
|
||||||
|
.filter_map(Result::ok)
|
||||||
|
.take(30)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
if new_events.is_empty() {
|
if new_events.is_empty() {
|
||||||
current_transaction_status.remove(&outgoing_kind);
|
current_transaction_status.remove(&outgoing_kind);
|
||||||
|
|
@ -171,35 +178,68 @@ impl Service {
|
||||||
// Insert pdus we found
|
// Insert pdus we found
|
||||||
self.db.mark_as_active(&new_events)?;
|
self.db.mark_as_active(&new_events)?;
|
||||||
|
|
||||||
futures.push(
|
futures.push(Self::handle_events(
|
||||||
Self::handle_events(
|
|
||||||
outgoing_kind.clone(),
|
outgoing_kind.clone(),
|
||||||
new_events.into_iter().map(|(event, _)| event).collect(),
|
new_events.into_iter().map(|(event, _)| event).collect(),
|
||||||
)
|
));
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err((outgoing_kind, _)) => {
|
Err((outgoing_kind, _)) => {
|
||||||
current_transaction_status.entry(outgoing_kind).and_modify(|e| *e = match e {
|
current_transaction_status
|
||||||
TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()),
|
.entry(outgoing_kind)
|
||||||
TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()),
|
.and_modify(|e| {
|
||||||
TransactionStatus::Failed(_, _) => {
|
*e = match e {
|
||||||
error!("Request that was not even running failed?!");
|
TransactionStatus::Running => {
|
||||||
return
|
TransactionStatus::Failed(1, Instant::now())
|
||||||
},
|
}
|
||||||
|
TransactionStatus::Retrying(n) => {
|
||||||
|
TransactionStatus::Failed(*n + 1, Instant::now())
|
||||||
|
}
|
||||||
|
TransactionStatus::Failed(..) => {
|
||||||
|
error!(
|
||||||
|
"Request that was not even \
|
||||||
|
running failed?!"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
},
|
|
||||||
Some((outgoing_kind, event, key)) = receiver.recv() => {
|
Result::<_>::Ok(())
|
||||||
|
};
|
||||||
|
|
||||||
|
let handle_receiver = |outgoing_kind,
|
||||||
|
event,
|
||||||
|
key,
|
||||||
|
current_transaction_status: &mut HashMap<_, _>,
|
||||||
|
futures: &mut FuturesUnordered<_>| {
|
||||||
if let Ok(Some(events)) = self.select_events(
|
if let Ok(Some(events)) = self.select_events(
|
||||||
&outgoing_kind,
|
&outgoing_kind,
|
||||||
vec![(event, key)],
|
vec![(event, key)],
|
||||||
&mut current_transaction_status,
|
current_transaction_status,
|
||||||
) {
|
) {
|
||||||
futures.push(Self::handle_events(outgoing_kind, events));
|
futures.push(Self::handle_events(outgoing_kind, events));
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
Some(response) = futures.next() =>
|
||||||
|
handle_futures(
|
||||||
|
response,
|
||||||
|
&mut current_transaction_status,
|
||||||
|
&mut futures,
|
||||||
|
)?,
|
||||||
|
Some((outgoing_kind, event, key)) = receiver.recv() =>
|
||||||
|
handle_receiver(
|
||||||
|
outgoing_kind,
|
||||||
|
event,
|
||||||
|
key,
|
||||||
|
&mut current_transaction_status,
|
||||||
|
&mut futures,
|
||||||
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue