diff --git a/book/changelog.md b/book/changelog.md index fb88793c..007cf51c 100644 --- a/book/changelog.md +++ b/book/changelog.md @@ -173,3 +173,6 @@ This will be the first release of Grapevine since it was forked from Conduit ([!46](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/46)) 10. Recognize the `!admin` prefix to invoke admin commands. ([!45](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/45)) +11. Add the `set-tracing-filter` admin command to change log/metrics/flame + filters dynamically at runtime. + ([!49](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/49)) diff --git a/src/database.rs b/src/database.rs index 0e495475..19ab1f23 100644 --- a/src/database.rs +++ b/src/database.rs @@ -25,8 +25,9 @@ use ruma::{ use tracing::{debug, error, info, info_span, warn, Instrument}; use crate::{ - config::DatabaseBackend, service::rooms::timeline::PduCount, services, - utils, Config, Error, PduEvent, Result, Services, SERVICES, + config::DatabaseBackend, observability::FilterReloadHandles, + service::rooms::timeline::PduCount, services, utils, Config, Error, + PduEvent, Result, Services, SERVICES, }; pub(crate) struct KeyValueDatabase { @@ -310,7 +311,10 @@ impl KeyValueDatabase { allow(unreachable_code) )] #[allow(clippy::too_many_lines)] - pub(crate) async fn load_or_create(config: Config) -> Result<()> { + pub(crate) async fn load_or_create( + config: Config, + reload_handles: FilterReloadHandles, + ) -> Result<()> { Self::check_db_setup(&config)?; if !Path::new(&config.database.path).exists() { @@ -527,7 +531,8 @@ impl KeyValueDatabase { let db = Box::leak(db_raw); - let services_raw = Box::new(Services::build(db, config)?); + let services_raw = + Box::new(Services::build(db, config, reload_handles)?); // This is the first and only time we initialize the SERVICE static *SERVICES.write().unwrap() = Some(Box::leak(services_raw)); diff --git a/src/main.rs b/src/main.rs index 71197409..4f2dc417 100644 --- a/src/main.rs +++ b/src/main.rs @@ -112,7 +112,7 @@ async fn try_main() -> Result<(), error::Main> { let config = config::load(args.config.as_ref()).await?; - let _guard = observability::init(&config)?; + let (_guard, reload_handles) = observability::init(&config)?; // This is needed for opening lots of file descriptors, which tends to // happen more often when using RocksDB and making lots of federation @@ -126,7 +126,7 @@ async fn try_main() -> Result<(), error::Main> { .expect("should be able to increase the soft limit to the hard limit"); info!("Loading database"); - KeyValueDatabase::load_or_create(config) + KeyValueDatabase::load_or_create(config, reload_handles) .await .map_err(Error::DatabaseError)?; diff --git a/src/observability.rs b/src/observability.rs index 678546ba..7add9ce4 100644 --- a/src/observability.rs +++ b/src/observability.rs @@ -22,7 +22,9 @@ use opentelemetry_sdk::{ use strum::{AsRefStr, IntoStaticStr}; use tokio::time::Instant; use tracing_flame::{FlameLayer, FlushGuard}; -use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer, Registry}; +use tracing_subscriber::{ + layer::SubscriberExt, reload, EnvFilter, Layer, Registry, +}; use crate::{ config::{Config, EnvFilterClone, LogFormat}, @@ -46,6 +48,44 @@ impl Drop for Guard { } } +/// We need to store a [`reload::Handle`] value, but can't name it's type +/// explicitly because the S type parameter depends on the subscriber's previous +/// layers. In our case, this includes unnameable 'impl Trait' types. +/// +/// This is fixed[1] in the unreleased tracing-subscriber from the master +/// branch, which removes the S parameter. Unfortunately can't use it without +/// pulling in a version of tracing that's incompatible with the rest of our +/// deps. +/// +/// To work around this, we define an trait without the S paramter that forwards +/// to the [`reload::Handle::reload`] method, and then store the handle as a +/// trait object. +/// +/// [1]: https://github.com/tokio-rs/tracing/pull/1035/commits/8a87ea52425098d3ef8f56d92358c2f6c144a28f +pub(crate) trait ReloadHandle { + /// Replace the layer with a new value. See [`reload::Handle::reload`]. + fn reload(&self, new_value: L) -> Result<(), reload::Error>; +} + +impl ReloadHandle for reload::Handle { + fn reload(&self, new_value: L) -> Result<(), reload::Error> { + reload::Handle::reload(self, new_value) + } +} + +/// A type-erased [reload handle][reload::Handle] for an [`EnvFilter`]. +pub(crate) type FilterReloadHandle = Box + Sync>; + +/// Collection of [`FilterReloadHandle`]s, allowing the filters for tracing +/// backends to be changed dynamically. Handles may be [`None`] if the backend +/// is disabled in the config. +#[allow(clippy::missing_docs_in_private_items)] +pub(crate) struct FilterReloadHandles { + pub(crate) traces: Option, + pub(crate) flame: Option, + pub(crate) log: Option, +} + /// A kind of data that gets looked up /// /// See also [`Metrics::record_lookup`]. @@ -95,24 +135,29 @@ fn make_backend( enable: bool, filter: &EnvFilterClone, init: impl FnOnce() -> Result<(L, T), error::Observability>, -) -> Result<(impl Layer, Option), error::Observability> +) -> Result< + (impl Layer, Option, Option), + error::Observability, +> where L: Layer, S: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>, { - enable - .then(|| { - let (layer, data) = init()?; - Ok((layer.with_filter(EnvFilter::from(filter)), data)) - }) - .transpose() - .map(Option::unzip) + if !enable { + return Ok((None, None, None)); + } + + let (filter, handle) = reload::Layer::new(EnvFilter::from(filter)); + let (layer, data) = init()?; + Ok((Some(layer.with_filter(filter)), Some(Box::new(handle)), Some(data))) } /// Initialize observability -pub(crate) fn init(config: &Config) -> Result { - let (jaeger_layer, _) = make_backend( +pub(crate) fn init( + config: &Config, +) -> Result<(Guard, FilterReloadHandles), error::Observability> { + let (traces_layer, traces_filter, _) = make_backend( config.observability.traces.enable, &config.observability.traces.filter, || { @@ -135,7 +180,7 @@ pub(crate) fn init(config: &Config) -> Result { }, )?; - let (flame_layer, flame_guard) = make_backend( + let (flame_layer, flame_filter, flame_guard) = make_backend( config.observability.flame.enable, &config.observability.flame.filter, || { @@ -145,7 +190,7 @@ pub(crate) fn init(config: &Config) -> Result { }, )?; - let (fmt_layer, _) = + let (log_layer, log_filter, _) = make_backend(true, &config.observability.logs.filter, || { /// Time format selection for `tracing_subscriber` at runtime #[allow(clippy::missing_docs_in_private_items)] @@ -185,14 +230,21 @@ pub(crate) fn init(config: &Config) -> Result { })?; let subscriber = Registry::default() - .with(jaeger_layer) + .with(traces_layer) .with(flame_layer) - .with(fmt_layer); + .with(log_layer); tracing::subscriber::set_global_default(subscriber)?; - Ok(Guard { - flame_guard, - }) + Ok(( + Guard { + flame_guard, + }, + FilterReloadHandles { + traces: traces_filter, + flame: flame_filter, + log: log_filter, + }, + )) } /// Construct the standard [`Resource`] value to use for this service diff --git a/src/service.rs b/src/service.rs index b9eee52b..61216a61 100644 --- a/src/service.rs +++ b/src/service.rs @@ -6,7 +6,7 @@ use std::{ use lru_cache::LruCache; use tokio::sync::{broadcast, Mutex, RwLock}; -use crate::{Config, Result}; +use crate::{observability::FilterReloadHandles, Config, Result}; pub(crate) mod account_data; pub(crate) mod admin; @@ -54,6 +54,7 @@ impl Services { >( db: &'static D, config: Config, + reload_handles: FilterReloadHandles, ) -> Result { Ok(Self { appservice: appservice::Service::build(db)?, @@ -149,7 +150,7 @@ impl Services { }, sending: sending::Service::build(db, &config), - globals: globals::Service::load(db, config)?, + globals: globals::Service::load(db, config, reload_handles)?, }) } diff --git a/src/service/admin.rs b/src/service/admin.rs index 669c9f49..b1edf1a9 100644 --- a/src/service/admin.rs +++ b/src/service/admin.rs @@ -1,6 +1,6 @@ use std::{collections::BTreeMap, fmt::Write, sync::Arc, time::Instant}; -use clap::Parser; +use clap::{Parser, ValueEnum}; use regex::Regex; use ruma::{ api::appservice::Registration, @@ -196,6 +196,12 @@ enum AdminCommand { // Allowed because the doc comment gets parsed by our code later #[allow(clippy::doc_markdown)] VerifyJson, + + /// Dynamically change a tracing backend's filter string + SetTracingFilter { + backend: TracingBackend, + filter: String, + }, } #[derive(Debug)] @@ -209,6 +215,13 @@ pub(crate) struct Service { receiver: Mutex>, } +#[derive(Debug, Clone, ValueEnum)] +enum TracingBackend { + Log, + Flame, + Traces, +} + impl Service { pub(crate) fn build() -> Arc { let (sender, receiver) = mpsc::unbounded_channel(); @@ -1081,6 +1094,39 @@ impl Service { ) } } + AdminCommand::SetTracingFilter { + backend, + filter, + } => { + let handles = &services().globals.reload_handles; + let handle = match backend { + TracingBackend::Log => &handles.log, + TracingBackend::Flame => &handles.flame, + TracingBackend::Traces => &handles.traces, + }; + let Some(handle) = handle else { + return Ok(RoomMessageEventContent::text_plain( + "Backend is disabled", + )); + }; + let filter = match filter.parse() { + Ok(filter) => filter, + Err(e) => { + return Ok(RoomMessageEventContent::text_plain( + format!("Invalid filter string: {e}"), + )); + } + }; + if let Err(e) = handle.reload(filter) { + return Ok(RoomMessageEventContent::text_plain(format!( + "Failed to reload filter: {e}" + ))); + }; + + return Ok(RoomMessageEventContent::text_plain( + "Filter reloaded", + )); + } }; Ok(reply_message_content) diff --git a/src/service/globals.rs b/src/service/globals.rs index 681ba44a..02ae4e93 100644 --- a/src/service/globals.rs +++ b/src/service/globals.rs @@ -32,7 +32,10 @@ use tokio::sync::{broadcast, Mutex, RwLock, Semaphore}; use tracing::{error, Instrument}; use trust_dns_resolver::TokioAsyncResolver; -use crate::{api::server_server::FedDest, services, Config, Error, Result}; +use crate::{ + api::server_server::FedDest, observability::FilterReloadHandles, services, + Config, Error, Result, +}; type WellKnownMap = HashMap; type TlsNameMap = HashMap, u16)>; @@ -41,6 +44,7 @@ type RateLimitState = (Instant, u32); pub(crate) struct Service { pub(crate) db: &'static dyn Data, + pub(crate) reload_handles: FilterReloadHandles, // actual_destination, host pub(crate) actual_destination_cache: Arc>, @@ -173,7 +177,11 @@ impl Resolve for Resolver { impl Service { #[tracing::instrument(skip_all)] - pub(crate) fn load(db: &'static dyn Data, config: Config) -> Result { + pub(crate) fn load( + db: &'static dyn Data, + config: Config, + reload_handles: FilterReloadHandles, + ) -> Result { let keypair = db.load_keypair(); let keypair = match keypair { @@ -227,6 +235,7 @@ impl Service { let mut s = Self { db, config, + reload_handles, keypair: Arc::new(keypair), dns_resolver: TokioAsyncResolver::tokio_from_system_conf() .map_err(|e| {