diff --git a/src/service/sending.rs b/src/service/sending.rs index ec4768bf..307824af 100644 --- a/src/service/sending.rs +++ b/src/service/sending.rs @@ -35,7 +35,7 @@ use tokio::{ select, sync::{mpsc, Mutex, Semaphore}, }; -use tracing::{debug, error, warn}; +use tracing::{debug, error, warn, Span}; use crate::{ api::{appservice_server, server_server}, @@ -105,6 +105,8 @@ pub(crate) struct RequestData { destination: Destination, event_type: SendingEventType, key: RequestKey, + /// Span of the original `send_*()` method call + requester_span: Span, } pub(crate) struct Service { @@ -128,17 +130,27 @@ enum TransactionStatus { struct HandlerInputs { destination: Destination, events: Vec, + /// Span of the original `send_*()` method call, if known (gets lost when + /// event is persisted to database) + requester_span: Option, } -type HandlerResponse = Result; -fn destination_from_response(response: &HandlerResponse) -> &Destination { - match response { - Ok(kind) | Err((kind, _)) => kind, - } +#[derive(Debug)] +struct HandlerResponse { + destination: Destination, + result: Result<()>, + /// The span of the just-completed handler, for follows-from relationships. + handler_span: Span, } type TransactionStatusMap = HashMap; +enum SelectedEvents { + None, + Retries(Vec), + New(Vec), +} + impl Service { pub(crate) fn build(db: &'static dyn Data, config: &Config) -> Arc { let (sender, receiver) = mpsc::unbounded_channel(); @@ -191,30 +203,28 @@ impl Service { for (destination, events) in initial_transactions { current_transaction_status .insert(destination.clone(), TransactionStatus::Running); - futures.push(handle_events(destination.clone(), events)); + futures.push(handle_events(HandlerInputs { + destination: destination.clone(), + events, + requester_span: None, + })); } loop { select! { Some(response) = futures.next() => { - if let Some(HandlerInputs { - destination, - events, - }) = self.handle_response( + if let Some(inputs) = self.handle_response( response, &mut current_transaction_status, )? { - futures.push(handle_events(destination, events)); + futures.push(handle_events(inputs)); } } Some(data) = receiver.recv() => { - if let Some(HandlerInputs { - destination, - events, - }) = self - .handle_receiver(data, &mut current_transaction_status) - { - futures.push(handle_events(destination, events)); + if let Some(inputs) = self.handle_receiver( + data, &mut current_transaction_status + ) { + futures.push(handle_events(inputs)); } } } @@ -222,20 +232,27 @@ impl Service { } #[tracing::instrument( - skip(self, current_transaction_status), + skip(self, handler_span, current_transaction_status), fields( current_status = ?current_transaction_status.get( - destination_from_response(&response) + &destination ), ), )] fn handle_response( &self, - response: HandlerResponse, + HandlerResponse { + destination, + result, + handler_span, + }: HandlerResponse, current_transaction_status: &mut TransactionStatusMap, ) -> Result> { - match response { - Ok(destination) => { + // clone() is required for the relationship to show up in jaeger + Span::current().follows_from(handler_span.clone()); + + match result { + Ok(()) => { self.db.delete_all_active_requests_for(&destination)?; // Find events that have been added since starting the @@ -260,10 +277,12 @@ impl Service { .into_iter() .map(|(event, _)| event) .collect(), + requester_span: None, })) } } - Err((destination, _)) => { + Err(_err) => { + warn!("Marking transaction as failed"); current_transaction_status.entry(destination).and_modify(|e| { *e = match e { TransactionStatus::Running => { @@ -286,7 +305,7 @@ impl Service { } #[tracing::instrument( - skip(self, event_type, key, current_transaction_status), + skip(self, event_type, key, requester_span, current_transaction_status), fields( current_status = ?current_transaction_status.get(&destination), ), @@ -297,20 +316,42 @@ impl Service { destination, event_type, key, + requester_span, }: RequestData, current_transaction_status: &mut TransactionStatusMap, ) -> Option { - if let Ok(Some(events)) = self.select_events( + // clone() is required for the relationship to show up in jaeger + Span::current().follows_from(requester_span.clone()); + + match self.select_events( &destination, vec![(event_type, key)], current_transaction_status, ) { - Some(HandlerInputs { - destination, - events, - }) - } else { - None + Ok(SelectedEvents::Retries(events)) => { + debug!("retrying old events"); + Some(HandlerInputs { + destination, + events, + requester_span: None, + }) + } + Ok(SelectedEvents::New(events)) => { + debug!("sending new event"); + Some(HandlerInputs { + destination, + events, + requester_span: Some(requester_span), + }) + } + Ok(SelectedEvents::None) => { + debug!("holding off from sending any events"); + None + } + Err(error) => { + error!(%error, "Failed to select events to send"); + None + } } } @@ -330,7 +371,7 @@ impl Service { Destination, TransactionStatus, >, - ) -> Result>> { + ) -> Result { let mut retry = false; let mut allow = true; @@ -364,19 +405,22 @@ impl Service { .or_insert(TransactionStatus::Running); if !allow { - return Ok(None); + return Ok(SelectedEvents::None); } - let mut events = Vec::new(); - if retry { // We retry the previous transaction - for (_, e) in - self.db.active_requests_for(destination).filter_map(Result::ok) - { - events.push(e); - } + let events = self + .db + .active_requests_for(destination) + .filter_map(Result::ok) + .map(|(_, e)| e) + .collect(); + + Ok(SelectedEvents::Retries(events)) } else { + let mut events = Vec::new(); + self.db.mark_as_active(&new_events)?; for (e, _) in new_events { events.push(e); @@ -393,9 +437,9 @@ impl Service { self.db.set_latest_educount(server_name, last_count)?; } } - } - Ok(Some(events)) + Ok(SelectedEvents::New(events)) + } } #[tracing::instrument(skip(self))] @@ -537,6 +581,7 @@ impl Service { destination, event_type, key: keys.into_iter().next().unwrap(), + requester_span: Span::current(), }) .unwrap(); @@ -567,6 +612,7 @@ impl Service { destination: destination.clone(), event_type, key, + requester_span: Span::current(), }) .unwrap(); } @@ -590,6 +636,7 @@ impl Service { destination, event_type, key: keys.into_iter().next().unwrap(), + requester_span: Span::current(), }) .unwrap(); @@ -611,6 +658,7 @@ impl Service { destination, event_type, key: keys.into_iter().next().unwrap(), + requester_span: Span::current(), }) .unwrap(); @@ -883,10 +931,18 @@ async fn handle_federation_event( #[tracing::instrument(skip_all)] async fn handle_events( - destination: Destination, - events: Vec, + HandlerInputs { + destination, + events, + requester_span, + }: HandlerInputs, ) -> HandlerResponse { - let ret = match &destination { + if let Some(span) = requester_span { + // clone() is required for the relationship to show up in jaeger + Span::current().follows_from(span.clone()); + } + + let result = match &destination { Destination::Appservice(id) => { handle_appservice_event(id, events).await } @@ -898,8 +954,9 @@ async fn handle_events( } }; - match ret { - Ok(()) => Ok(destination), - Err(e) => Err((destination, e)), + HandlerResponse { + destination, + result, + handler_span: Span::current(), } }