mirror of
https://gitlab.computer.surgery/matrix/grapevine.git
synced 2025-12-17 07:41:23 +01:00
refactor handle_response in service/sending
Early returns good.
This commit is contained in:
parent
f2e5b281c9
commit
e13db834ed
1 changed files with 38 additions and 48 deletions
|
|
@ -256,57 +256,47 @@ impl Service {
|
||||||
Span::current().record("error", e.to_string());
|
Span::current().record("error", e.to_string());
|
||||||
}
|
}
|
||||||
|
|
||||||
match result {
|
if let Err(error) = result {
|
||||||
Ok(()) => {
|
warn!(%error, "Marking transaction as failed");
|
||||||
self.db.delete_all_active_requests_for(&destination)?;
|
current_transaction_status.entry(destination).and_modify(|e| {
|
||||||
|
use TransactionStatus::{Failed, Retrying, Running};
|
||||||
|
|
||||||
// Find events that have been added since starting the
|
*e = match e {
|
||||||
// last request
|
Running => Failed(1, Instant::now()),
|
||||||
let new_events = self
|
Retrying(n) => Failed(*n + 1, Instant::now()),
|
||||||
.db
|
Failed(..) => {
|
||||||
.queued_requests(&destination)
|
error!("Request that was not even running failed?!");
|
||||||
.filter_map(Result::ok)
|
return;
|
||||||
.take(30)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
if new_events.is_empty() {
|
|
||||||
current_transaction_status.remove(&destination);
|
|
||||||
Ok(None)
|
|
||||||
} else {
|
|
||||||
// Insert pdus we found
|
|
||||||
self.db.mark_as_active(&new_events)?;
|
|
||||||
|
|
||||||
Ok(Some(HandlerInputs {
|
|
||||||
destination: destination.clone(),
|
|
||||||
events: new_events
|
|
||||||
.into_iter()
|
|
||||||
.map(|(event, _)| event)
|
|
||||||
.collect(),
|
|
||||||
requester_span: None,
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(error) => {
|
|
||||||
warn!(%error, "Marking transaction as failed");
|
|
||||||
current_transaction_status.entry(destination).and_modify(|e| {
|
|
||||||
*e = match e {
|
|
||||||
TransactionStatus::Running => {
|
|
||||||
TransactionStatus::Failed(1, Instant::now())
|
|
||||||
}
|
|
||||||
TransactionStatus::Retrying(n) => {
|
|
||||||
TransactionStatus::Failed(*n + 1, Instant::now())
|
|
||||||
}
|
|
||||||
TransactionStatus::Failed(..) => {
|
|
||||||
error!(
|
|
||||||
"Request that was not even running failed?!"
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
Ok(None)
|
});
|
||||||
}
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.db.delete_all_active_requests_for(&destination)?;
|
||||||
|
|
||||||
|
// Find events that have been added since starting the
|
||||||
|
// last request
|
||||||
|
let new_events = self
|
||||||
|
.db
|
||||||
|
.queued_requests(&destination)
|
||||||
|
.filter_map(Result::ok)
|
||||||
|
.take(30)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
if new_events.is_empty() {
|
||||||
|
current_transaction_status.remove(&destination);
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert pdus we found
|
||||||
|
self.db.mark_as_active(&new_events)?;
|
||||||
|
|
||||||
|
Ok(Some(HandlerInputs {
|
||||||
|
destination: destination.clone(),
|
||||||
|
events: new_events.into_iter().map(|(event, _)| event).collect(),
|
||||||
|
requester_span: None,
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(
|
#[tracing::instrument(
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue