Fix spans in tokio::spawn-ed tasks

tokio::spawn is a span boundary, the spawned future has no parent span.

For short futures, we simply inherit the current span with
`.in_current_span()`.

For long running futures containing a sleeping infinite loop, we don't
actually want a span on the entire task or even the entire loop body,
both would result in very long spans. Instead, we put the outermost span
(created using #[tracing::instrument] or .instrument()) around the
actual work happening after the sleep, which results in a new root span
being created after every sleep.
This commit is contained in:
Lambda 2024-05-19 21:39:13 +00:00
parent 5e9e5b76bc
commit ac42e0bfff
3 changed files with 109 additions and 70 deletions

View file

@ -22,7 +22,7 @@ use ruma::{
CanonicalJsonValue, EventId, OwnedDeviceId, OwnedEventId, OwnedRoomId, CanonicalJsonValue, EventId, OwnedDeviceId, OwnedEventId, OwnedRoomId,
OwnedUserId, RoomId, UserId, OwnedUserId, RoomId, UserId,
}; };
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, info_span, warn, Instrument};
use crate::{ use crate::{
service::rooms::timeline::PduCount, services, utils, Config, Error, service::rooms::timeline::PduCount, services, utils, Config, Error,
@ -1200,26 +1200,31 @@ impl KeyValueDatabase {
loop { loop {
#[cfg(unix)] #[cfg(unix)]
tokio::select! { let msg = tokio::select! {
_ = i.tick() => { _ = i.tick() => || {
debug!("cleanup: Timer ticked"); debug!("cleanup: Timer ticked");
} },
_ = s.recv() => { _ = s.recv() => || {
debug!("cleanup: Received SIGHUP"); debug!("cleanup: Received SIGHUP");
} },
}; };
#[cfg(not(unix))] #[cfg(not(unix))]
{ let msg = {
i.tick().await; i.tick().await;
debug!("cleanup: Timer ticked") || debug!("cleanup: Timer ticked")
} };
let start = Instant::now(); async {
if let Err(e) = services().globals.cleanup() { msg();
error!("cleanup: Errored: {}", e); let start = Instant::now();
} else { if let Err(e) = services().globals.cleanup() {
debug!("cleanup: Finished in {:?}", start.elapsed()); error!("cleanup: Errored: {}", e);
} else {
debug!("cleanup: Finished in {:?}", start.elapsed());
}
} }
.instrument(info_span!("database_cleanup"))
.await;
} }
}); });
} }

View file

@ -218,71 +218,84 @@ impl Service {
pub(crate) fn start_handler(self: &Arc<Self>) { pub(crate) fn start_handler(self: &Arc<Self>) {
let self2 = Arc::clone(self); let self2 = Arc::clone(self);
tokio::spawn(async move { tokio::spawn(async move {
self2.handler().await; let mut receiver = self2.receiver.lock().await;
}); let grapevine_user = UserId::parse(format!(
} "@{}:{}",
if services().globals.config.conduit_compat {
"conduit"
} else {
"grapevine"
},
services().globals.server_name()
))
.expect("admin bot username should be valid");
async fn handler(&self) { let Ok(Some(grapevine_room)) = services().admin.get_admin_room()
let mut receiver = self.receiver.lock().await; else {
// TODO: Use futures when we have long admin commands return;
};
let grapevine_user = UserId::parse(format!(
"@{}:{}",
if services().globals.config.conduit_compat {
"conduit"
} else {
"grapevine"
},
services().globals.server_name()
))
.expect("admin bot username should be valid");
if let Ok(Some(grapevine_room)) = services().admin.get_admin_room() {
loop { loop {
let event = receiver let event = receiver
.recv() .recv()
.await .await
.expect("admin command channel has been closed"); .expect("admin command channel has been closed");
let message_content = match event { Self::handle_event(
AdminRoomEvent::SendMessage(content) => content, &self2,
AdminRoomEvent::ProcessMessage(room_message) => { event,
self.process_admin_message(room_message).await &grapevine_room,
} &grapevine_user,
}; )
.await;
let mutex_state = Arc::clone(
services()
.globals
.roomid_mutex_state
.write()
.await
.entry(grapevine_room.clone())
.or_default(),
);
let state_lock = mutex_state.lock().await;
services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder {
event_type: TimelineEventType::RoomMessage,
content: to_raw_value(&message_content)
.expect("event is valid, we just created it"),
unsigned: None,
state_key: None,
redacts: None,
},
&grapevine_user,
&grapevine_room,
&state_lock,
)
.await
.unwrap();
} }
} });
}
#[tracing::instrument(skip(self, grapevine_room, grapevine_user))]
async fn handle_event(
&self,
event: AdminRoomEvent,
grapevine_room: &OwnedRoomId,
grapevine_user: &ruma::OwnedUserId,
) {
let message_content = match event {
AdminRoomEvent::SendMessage(content) => content,
AdminRoomEvent::ProcessMessage(room_message) => {
self.process_admin_message(room_message).await
}
};
let mutex_state = Arc::clone(
services()
.globals
.roomid_mutex_state
.write()
.await
.entry(grapevine_room.clone())
.or_default(),
);
let state_lock = mutex_state.lock().await;
services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder {
event_type: TimelineEventType::RoomMessage,
content: to_raw_value(&message_content)
.expect("event is valid, we just created it"),
unsigned: None,
state_key: None,
redacts: None,
},
grapevine_user,
grapevine_room,
&state_lock,
)
.await
.unwrap();
} }
pub(crate) fn process_message(&self, room_message: String) { pub(crate) fn process_message(&self, room_message: String) {

View file

@ -100,6 +100,7 @@ pub(crate) struct Service {
>, >,
} }
#[derive(Debug)]
enum TransactionStatus { enum TransactionStatus {
Running, Running,
// number of times failed, time of last failure // number of times failed, time of last failure
@ -114,6 +115,12 @@ struct HandlerInputs {
} }
type HandlerResponse = Result<OutgoingKind, (OutgoingKind, Error)>; type HandlerResponse = Result<OutgoingKind, (OutgoingKind, Error)>;
fn outgoing_kind_from_response(response: &HandlerResponse) -> &OutgoingKind {
match response {
Ok(kind) | Err((kind, _)) => kind,
}
}
type TransactionStatusMap = HashMap<OutgoingKind, TransactionStatus>; type TransactionStatusMap = HashMap<OutgoingKind, TransactionStatus>;
impl Service { impl Service {
@ -197,6 +204,14 @@ impl Service {
} }
} }
#[tracing::instrument(
skip(self, current_transaction_status),
fields(
current_status = ?current_transaction_status.get(
outgoing_kind_from_response(&response)
),
),
)]
fn handle_futures( fn handle_futures(
&self, &self,
response: HandlerResponse, response: HandlerResponse,
@ -259,6 +274,12 @@ impl Service {
} }
} }
#[tracing::instrument(
skip(self, event, key, current_transaction_status),
fields(
current_status = ?current_transaction_status.get(&outgoing_kind),
),
)]
fn handle_receiver( fn handle_receiver(
&self, &self,
outgoing_kind: OutgoingKind, outgoing_kind: OutgoingKind,