From ac42e0bfff6af8677636a3dc1a56701a3255071d Mon Sep 17 00:00:00 2001 From: Lambda Date: Sun, 19 May 2024 21:39:13 +0000 Subject: [PATCH] Fix spans in tokio::spawn-ed tasks tokio::spawn is a span boundary, the spawned future has no parent span. For short futures, we simply inherit the current span with `.in_current_span()`. For long running futures containing a sleeping infinite loop, we don't actually want a span on the entire task or even the entire loop body, both would result in very long spans. Instead, we put the outermost span (created using #[tracing::instrument] or .instrument()) around the actual work happening after the sleep, which results in a new root span being created after every sleep. --- src/database.rs | 33 ++++++----- src/service/admin.rs | 125 +++++++++++++++++++++++------------------ src/service/sending.rs | 21 +++++++ 3 files changed, 109 insertions(+), 70 deletions(-) diff --git a/src/database.rs b/src/database.rs index 4a3ca3ff..d32b57f5 100644 --- a/src/database.rs +++ b/src/database.rs @@ -22,7 +22,7 @@ use ruma::{ CanonicalJsonValue, EventId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, }; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, info_span, warn, Instrument}; use crate::{ service::rooms::timeline::PduCount, services, utils, Config, Error, @@ -1200,26 +1200,31 @@ impl KeyValueDatabase { loop { #[cfg(unix)] - tokio::select! { - _ = i.tick() => { + let msg = tokio::select! { + _ = i.tick() => || { debug!("cleanup: Timer ticked"); - } - _ = s.recv() => { + }, + _ = s.recv() => || { debug!("cleanup: Received SIGHUP"); - } + }, }; #[cfg(not(unix))] - { + let msg = { i.tick().await; - debug!("cleanup: Timer ticked") - } + || debug!("cleanup: Timer ticked") + }; - let start = Instant::now(); - if let Err(e) = services().globals.cleanup() { - error!("cleanup: Errored: {}", e); - } else { - debug!("cleanup: Finished in {:?}", start.elapsed()); + async { + msg(); + let start = Instant::now(); + if let Err(e) = services().globals.cleanup() { + error!("cleanup: Errored: {}", e); + } else { + debug!("cleanup: Finished in {:?}", start.elapsed()); + } } + .instrument(info_span!("database_cleanup")) + .await; } }); } diff --git a/src/service/admin.rs b/src/service/admin.rs index 6d1386cc..5d191fae 100644 --- a/src/service/admin.rs +++ b/src/service/admin.rs @@ -218,71 +218,84 @@ impl Service { pub(crate) fn start_handler(self: &Arc) { let self2 = Arc::clone(self); tokio::spawn(async move { - self2.handler().await; - }); - } + let mut receiver = self2.receiver.lock().await; + let grapevine_user = UserId::parse(format!( + "@{}:{}", + if services().globals.config.conduit_compat { + "conduit" + } else { + "grapevine" + }, + services().globals.server_name() + )) + .expect("admin bot username should be valid"); - async fn handler(&self) { - let mut receiver = self.receiver.lock().await; - // TODO: Use futures when we have long admin commands + let Ok(Some(grapevine_room)) = services().admin.get_admin_room() + else { + return; + }; - let grapevine_user = UserId::parse(format!( - "@{}:{}", - if services().globals.config.conduit_compat { - "conduit" - } else { - "grapevine" - }, - services().globals.server_name() - )) - .expect("admin bot username should be valid"); - - if let Ok(Some(grapevine_room)) = services().admin.get_admin_room() { loop { let event = receiver .recv() .await .expect("admin command channel has been closed"); - let message_content = match event { - AdminRoomEvent::SendMessage(content) => content, - AdminRoomEvent::ProcessMessage(room_message) => { - self.process_admin_message(room_message).await - } - }; - - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(grapevine_room.clone()) - .or_default(), - ); - - let state_lock = mutex_state.lock().await; - - services() - .rooms - .timeline - .build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMessage, - content: to_raw_value(&message_content) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: None, - }, - &grapevine_user, - &grapevine_room, - &state_lock, - ) - .await - .unwrap(); + Self::handle_event( + &self2, + event, + &grapevine_room, + &grapevine_user, + ) + .await; } - } + }); + } + + #[tracing::instrument(skip(self, grapevine_room, grapevine_user))] + async fn handle_event( + &self, + event: AdminRoomEvent, + grapevine_room: &OwnedRoomId, + grapevine_user: &ruma::OwnedUserId, + ) { + let message_content = match event { + AdminRoomEvent::SendMessage(content) => content, + AdminRoomEvent::ProcessMessage(room_message) => { + self.process_admin_message(room_message).await + } + }; + + let mutex_state = Arc::clone( + services() + .globals + .roomid_mutex_state + .write() + .await + .entry(grapevine_room.clone()) + .or_default(), + ); + + let state_lock = mutex_state.lock().await; + + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMessage, + content: to_raw_value(&message_content) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: None, + }, + grapevine_user, + grapevine_room, + &state_lock, + ) + .await + .unwrap(); } pub(crate) fn process_message(&self, room_message: String) { diff --git a/src/service/sending.rs b/src/service/sending.rs index 832a6f87..fd4eb2c9 100644 --- a/src/service/sending.rs +++ b/src/service/sending.rs @@ -100,6 +100,7 @@ pub(crate) struct Service { >, } +#[derive(Debug)] enum TransactionStatus { Running, // number of times failed, time of last failure @@ -114,6 +115,12 @@ struct HandlerInputs { } type HandlerResponse = Result; +fn outgoing_kind_from_response(response: &HandlerResponse) -> &OutgoingKind { + match response { + Ok(kind) | Err((kind, _)) => kind, + } +} + type TransactionStatusMap = HashMap; impl Service { @@ -197,6 +204,14 @@ impl Service { } } + #[tracing::instrument( + skip(self, current_transaction_status), + fields( + current_status = ?current_transaction_status.get( + outgoing_kind_from_response(&response) + ), + ), + )] fn handle_futures( &self, response: HandlerResponse, @@ -259,6 +274,12 @@ impl Service { } } + #[tracing::instrument( + skip(self, event, key, current_transaction_status), + fields( + current_status = ?current_transaction_status.get(&outgoing_kind), + ), + )] fn handle_receiver( &self, outgoing_kind: OutgoingKind,