mirror of
https://gitlab.computer.surgery/matrix/grapevine.git
synced 2025-12-17 07:41:23 +01:00
Allow tracing filters to be changed at runtime
ReloadHandle is taken from conduwuit commit 8a5599adf9eafe9111f3d1597f8fb333b8b76849, authored by Benjamin. Co-authored-by: Benjamin Lee <benjamin@computer.surgery>
This commit is contained in:
parent
f576aff7eb
commit
f89e1c7dfc
7 changed files with 145 additions and 29 deletions
|
|
@ -173,3 +173,6 @@ This will be the first release of Grapevine since it was forked from Conduit
|
||||||
([!46](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/46))
|
([!46](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/46))
|
||||||
10. Recognize the `!admin` prefix to invoke admin commands.
|
10. Recognize the `!admin` prefix to invoke admin commands.
|
||||||
([!45](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/45))
|
([!45](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/45))
|
||||||
|
11. Add the `set-tracing-filter` admin command to change log/metrics/flame
|
||||||
|
filters dynamically at runtime.
|
||||||
|
([!49](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/49))
|
||||||
|
|
|
||||||
|
|
@ -25,8 +25,9 @@ use ruma::{
|
||||||
use tracing::{debug, error, info, info_span, warn, Instrument};
|
use tracing::{debug, error, info, info_span, warn, Instrument};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::DatabaseBackend, service::rooms::timeline::PduCount, services,
|
config::DatabaseBackend, observability::FilterReloadHandles,
|
||||||
utils, Config, Error, PduEvent, Result, Services, SERVICES,
|
service::rooms::timeline::PduCount, services, utils, Config, Error,
|
||||||
|
PduEvent, Result, Services, SERVICES,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub(crate) struct KeyValueDatabase {
|
pub(crate) struct KeyValueDatabase {
|
||||||
|
|
@ -310,7 +311,10 @@ impl KeyValueDatabase {
|
||||||
allow(unreachable_code)
|
allow(unreachable_code)
|
||||||
)]
|
)]
|
||||||
#[allow(clippy::too_many_lines)]
|
#[allow(clippy::too_many_lines)]
|
||||||
pub(crate) async fn load_or_create(config: Config) -> Result<()> {
|
pub(crate) async fn load_or_create(
|
||||||
|
config: Config,
|
||||||
|
reload_handles: FilterReloadHandles,
|
||||||
|
) -> Result<()> {
|
||||||
Self::check_db_setup(&config)?;
|
Self::check_db_setup(&config)?;
|
||||||
|
|
||||||
if !Path::new(&config.database.path).exists() {
|
if !Path::new(&config.database.path).exists() {
|
||||||
|
|
@ -527,7 +531,8 @@ impl KeyValueDatabase {
|
||||||
|
|
||||||
let db = Box::leak(db_raw);
|
let db = Box::leak(db_raw);
|
||||||
|
|
||||||
let services_raw = Box::new(Services::build(db, config)?);
|
let services_raw =
|
||||||
|
Box::new(Services::build(db, config, reload_handles)?);
|
||||||
|
|
||||||
// This is the first and only time we initialize the SERVICE static
|
// This is the first and only time we initialize the SERVICE static
|
||||||
*SERVICES.write().unwrap() = Some(Box::leak(services_raw));
|
*SERVICES.write().unwrap() = Some(Box::leak(services_raw));
|
||||||
|
|
|
||||||
|
|
@ -112,7 +112,7 @@ async fn try_main() -> Result<(), error::Main> {
|
||||||
|
|
||||||
let config = config::load(args.config.as_ref()).await?;
|
let config = config::load(args.config.as_ref()).await?;
|
||||||
|
|
||||||
let _guard = observability::init(&config)?;
|
let (_guard, reload_handles) = observability::init(&config)?;
|
||||||
|
|
||||||
// This is needed for opening lots of file descriptors, which tends to
|
// This is needed for opening lots of file descriptors, which tends to
|
||||||
// happen more often when using RocksDB and making lots of federation
|
// happen more often when using RocksDB and making lots of federation
|
||||||
|
|
@ -126,7 +126,7 @@ async fn try_main() -> Result<(), error::Main> {
|
||||||
.expect("should be able to increase the soft limit to the hard limit");
|
.expect("should be able to increase the soft limit to the hard limit");
|
||||||
|
|
||||||
info!("Loading database");
|
info!("Loading database");
|
||||||
KeyValueDatabase::load_or_create(config)
|
KeyValueDatabase::load_or_create(config, reload_handles)
|
||||||
.await
|
.await
|
||||||
.map_err(Error::DatabaseError)?;
|
.map_err(Error::DatabaseError)?;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,9 @@ use opentelemetry_sdk::{
|
||||||
use strum::{AsRefStr, IntoStaticStr};
|
use strum::{AsRefStr, IntoStaticStr};
|
||||||
use tokio::time::Instant;
|
use tokio::time::Instant;
|
||||||
use tracing_flame::{FlameLayer, FlushGuard};
|
use tracing_flame::{FlameLayer, FlushGuard};
|
||||||
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer, Registry};
|
use tracing_subscriber::{
|
||||||
|
layer::SubscriberExt, reload, EnvFilter, Layer, Registry,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::{Config, EnvFilterClone, LogFormat},
|
config::{Config, EnvFilterClone, LogFormat},
|
||||||
|
|
@ -46,6 +48,44 @@ impl Drop for Guard {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// We need to store a [`reload::Handle`] value, but can't name it's type
|
||||||
|
/// explicitly because the S type parameter depends on the subscriber's previous
|
||||||
|
/// layers. In our case, this includes unnameable 'impl Trait' types.
|
||||||
|
///
|
||||||
|
/// This is fixed[1] in the unreleased tracing-subscriber from the master
|
||||||
|
/// branch, which removes the S parameter. Unfortunately can't use it without
|
||||||
|
/// pulling in a version of tracing that's incompatible with the rest of our
|
||||||
|
/// deps.
|
||||||
|
///
|
||||||
|
/// To work around this, we define an trait without the S paramter that forwards
|
||||||
|
/// to the [`reload::Handle::reload`] method, and then store the handle as a
|
||||||
|
/// trait object.
|
||||||
|
///
|
||||||
|
/// [1]: https://github.com/tokio-rs/tracing/pull/1035/commits/8a87ea52425098d3ef8f56d92358c2f6c144a28f
|
||||||
|
pub(crate) trait ReloadHandle<L> {
|
||||||
|
/// Replace the layer with a new value. See [`reload::Handle::reload`].
|
||||||
|
fn reload(&self, new_value: L) -> Result<(), reload::Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<L, S> ReloadHandle<L> for reload::Handle<L, S> {
|
||||||
|
fn reload(&self, new_value: L) -> Result<(), reload::Error> {
|
||||||
|
reload::Handle::reload(self, new_value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A type-erased [reload handle][reload::Handle] for an [`EnvFilter`].
|
||||||
|
pub(crate) type FilterReloadHandle = Box<dyn ReloadHandle<EnvFilter> + Sync>;
|
||||||
|
|
||||||
|
/// Collection of [`FilterReloadHandle`]s, allowing the filters for tracing
|
||||||
|
/// backends to be changed dynamically. Handles may be [`None`] if the backend
|
||||||
|
/// is disabled in the config.
|
||||||
|
#[allow(clippy::missing_docs_in_private_items)]
|
||||||
|
pub(crate) struct FilterReloadHandles {
|
||||||
|
pub(crate) traces: Option<FilterReloadHandle>,
|
||||||
|
pub(crate) flame: Option<FilterReloadHandle>,
|
||||||
|
pub(crate) log: Option<FilterReloadHandle>,
|
||||||
|
}
|
||||||
|
|
||||||
/// A kind of data that gets looked up
|
/// A kind of data that gets looked up
|
||||||
///
|
///
|
||||||
/// See also [`Metrics::record_lookup`].
|
/// See also [`Metrics::record_lookup`].
|
||||||
|
|
@ -95,24 +135,29 @@ fn make_backend<S, L, T>(
|
||||||
enable: bool,
|
enable: bool,
|
||||||
filter: &EnvFilterClone,
|
filter: &EnvFilterClone,
|
||||||
init: impl FnOnce() -> Result<(L, T), error::Observability>,
|
init: impl FnOnce() -> Result<(L, T), error::Observability>,
|
||||||
) -> Result<(impl Layer<S>, Option<T>), error::Observability>
|
) -> Result<
|
||||||
|
(impl Layer<S>, Option<FilterReloadHandle>, Option<T>),
|
||||||
|
error::Observability,
|
||||||
|
>
|
||||||
where
|
where
|
||||||
L: Layer<S>,
|
L: Layer<S>,
|
||||||
S: tracing::Subscriber
|
S: tracing::Subscriber
|
||||||
+ for<'span> tracing_subscriber::registry::LookupSpan<'span>,
|
+ for<'span> tracing_subscriber::registry::LookupSpan<'span>,
|
||||||
{
|
{
|
||||||
enable
|
if !enable {
|
||||||
.then(|| {
|
return Ok((None, None, None));
|
||||||
let (layer, data) = init()?;
|
}
|
||||||
Ok((layer.with_filter(EnvFilter::from(filter)), data))
|
|
||||||
})
|
let (filter, handle) = reload::Layer::new(EnvFilter::from(filter));
|
||||||
.transpose()
|
let (layer, data) = init()?;
|
||||||
.map(Option::unzip)
|
Ok((Some(layer.with_filter(filter)), Some(Box::new(handle)), Some(data)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Initialize observability
|
/// Initialize observability
|
||||||
pub(crate) fn init(config: &Config) -> Result<Guard, error::Observability> {
|
pub(crate) fn init(
|
||||||
let (jaeger_layer, _) = make_backend(
|
config: &Config,
|
||||||
|
) -> Result<(Guard, FilterReloadHandles), error::Observability> {
|
||||||
|
let (traces_layer, traces_filter, _) = make_backend(
|
||||||
config.observability.traces.enable,
|
config.observability.traces.enable,
|
||||||
&config.observability.traces.filter,
|
&config.observability.traces.filter,
|
||||||
|| {
|
|| {
|
||||||
|
|
@ -135,7 +180,7 @@ pub(crate) fn init(config: &Config) -> Result<Guard, error::Observability> {
|
||||||
},
|
},
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let (flame_layer, flame_guard) = make_backend(
|
let (flame_layer, flame_filter, flame_guard) = make_backend(
|
||||||
config.observability.flame.enable,
|
config.observability.flame.enable,
|
||||||
&config.observability.flame.filter,
|
&config.observability.flame.filter,
|
||||||
|| {
|
|| {
|
||||||
|
|
@ -145,7 +190,7 @@ pub(crate) fn init(config: &Config) -> Result<Guard, error::Observability> {
|
||||||
},
|
},
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let (fmt_layer, _) =
|
let (log_layer, log_filter, _) =
|
||||||
make_backend(true, &config.observability.logs.filter, || {
|
make_backend(true, &config.observability.logs.filter, || {
|
||||||
/// Time format selection for `tracing_subscriber` at runtime
|
/// Time format selection for `tracing_subscriber` at runtime
|
||||||
#[allow(clippy::missing_docs_in_private_items)]
|
#[allow(clippy::missing_docs_in_private_items)]
|
||||||
|
|
@ -185,14 +230,21 @@ pub(crate) fn init(config: &Config) -> Result<Guard, error::Observability> {
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let subscriber = Registry::default()
|
let subscriber = Registry::default()
|
||||||
.with(jaeger_layer)
|
.with(traces_layer)
|
||||||
.with(flame_layer)
|
.with(flame_layer)
|
||||||
.with(fmt_layer);
|
.with(log_layer);
|
||||||
tracing::subscriber::set_global_default(subscriber)?;
|
tracing::subscriber::set_global_default(subscriber)?;
|
||||||
|
|
||||||
Ok(Guard {
|
Ok((
|
||||||
flame_guard,
|
Guard {
|
||||||
})
|
flame_guard,
|
||||||
|
},
|
||||||
|
FilterReloadHandles {
|
||||||
|
traces: traces_filter,
|
||||||
|
flame: flame_filter,
|
||||||
|
log: log_filter,
|
||||||
|
},
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Construct the standard [`Resource`] value to use for this service
|
/// Construct the standard [`Resource`] value to use for this service
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ use std::{
|
||||||
use lru_cache::LruCache;
|
use lru_cache::LruCache;
|
||||||
use tokio::sync::{broadcast, Mutex, RwLock};
|
use tokio::sync::{broadcast, Mutex, RwLock};
|
||||||
|
|
||||||
use crate::{Config, Result};
|
use crate::{observability::FilterReloadHandles, Config, Result};
|
||||||
|
|
||||||
pub(crate) mod account_data;
|
pub(crate) mod account_data;
|
||||||
pub(crate) mod admin;
|
pub(crate) mod admin;
|
||||||
|
|
@ -54,6 +54,7 @@ impl Services {
|
||||||
>(
|
>(
|
||||||
db: &'static D,
|
db: &'static D,
|
||||||
config: Config,
|
config: Config,
|
||||||
|
reload_handles: FilterReloadHandles,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
appservice: appservice::Service::build(db)?,
|
appservice: appservice::Service::build(db)?,
|
||||||
|
|
@ -149,7 +150,7 @@ impl Services {
|
||||||
},
|
},
|
||||||
sending: sending::Service::build(db, &config),
|
sending: sending::Service::build(db, &config),
|
||||||
|
|
||||||
globals: globals::Service::load(db, config)?,
|
globals: globals::Service::load(db, config, reload_handles)?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
use std::{collections::BTreeMap, fmt::Write, sync::Arc, time::Instant};
|
use std::{collections::BTreeMap, fmt::Write, sync::Arc, time::Instant};
|
||||||
|
|
||||||
use clap::Parser;
|
use clap::{Parser, ValueEnum};
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::appservice::Registration,
|
api::appservice::Registration,
|
||||||
|
|
@ -196,6 +196,12 @@ enum AdminCommand {
|
||||||
// Allowed because the doc comment gets parsed by our code later
|
// Allowed because the doc comment gets parsed by our code later
|
||||||
#[allow(clippy::doc_markdown)]
|
#[allow(clippy::doc_markdown)]
|
||||||
VerifyJson,
|
VerifyJson,
|
||||||
|
|
||||||
|
/// Dynamically change a tracing backend's filter string
|
||||||
|
SetTracingFilter {
|
||||||
|
backend: TracingBackend,
|
||||||
|
filter: String,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
@ -209,6 +215,13 @@ pub(crate) struct Service {
|
||||||
receiver: Mutex<mpsc::UnboundedReceiver<AdminRoomEvent>>,
|
receiver: Mutex<mpsc::UnboundedReceiver<AdminRoomEvent>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, ValueEnum)]
|
||||||
|
enum TracingBackend {
|
||||||
|
Log,
|
||||||
|
Flame,
|
||||||
|
Traces,
|
||||||
|
}
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
pub(crate) fn build() -> Arc<Self> {
|
pub(crate) fn build() -> Arc<Self> {
|
||||||
let (sender, receiver) = mpsc::unbounded_channel();
|
let (sender, receiver) = mpsc::unbounded_channel();
|
||||||
|
|
@ -1081,6 +1094,39 @@ impl Service {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
AdminCommand::SetTracingFilter {
|
||||||
|
backend,
|
||||||
|
filter,
|
||||||
|
} => {
|
||||||
|
let handles = &services().globals.reload_handles;
|
||||||
|
let handle = match backend {
|
||||||
|
TracingBackend::Log => &handles.log,
|
||||||
|
TracingBackend::Flame => &handles.flame,
|
||||||
|
TracingBackend::Traces => &handles.traces,
|
||||||
|
};
|
||||||
|
let Some(handle) = handle else {
|
||||||
|
return Ok(RoomMessageEventContent::text_plain(
|
||||||
|
"Backend is disabled",
|
||||||
|
));
|
||||||
|
};
|
||||||
|
let filter = match filter.parse() {
|
||||||
|
Ok(filter) => filter,
|
||||||
|
Err(e) => {
|
||||||
|
return Ok(RoomMessageEventContent::text_plain(
|
||||||
|
format!("Invalid filter string: {e}"),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(e) = handle.reload(filter) {
|
||||||
|
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||||
|
"Failed to reload filter: {e}"
|
||||||
|
)));
|
||||||
|
};
|
||||||
|
|
||||||
|
return Ok(RoomMessageEventContent::text_plain(
|
||||||
|
"Filter reloaded",
|
||||||
|
));
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(reply_message_content)
|
Ok(reply_message_content)
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,10 @@ use tokio::sync::{broadcast, Mutex, RwLock, Semaphore};
|
||||||
use tracing::{error, Instrument};
|
use tracing::{error, Instrument};
|
||||||
use trust_dns_resolver::TokioAsyncResolver;
|
use trust_dns_resolver::TokioAsyncResolver;
|
||||||
|
|
||||||
use crate::{api::server_server::FedDest, services, Config, Error, Result};
|
use crate::{
|
||||||
|
api::server_server::FedDest, observability::FilterReloadHandles, services,
|
||||||
|
Config, Error, Result,
|
||||||
|
};
|
||||||
|
|
||||||
type WellKnownMap = HashMap<OwnedServerName, (FedDest, String)>;
|
type WellKnownMap = HashMap<OwnedServerName, (FedDest, String)>;
|
||||||
type TlsNameMap = HashMap<String, (Vec<IpAddr>, u16)>;
|
type TlsNameMap = HashMap<String, (Vec<IpAddr>, u16)>;
|
||||||
|
|
@ -41,6 +44,7 @@ type RateLimitState = (Instant, u32);
|
||||||
|
|
||||||
pub(crate) struct Service {
|
pub(crate) struct Service {
|
||||||
pub(crate) db: &'static dyn Data,
|
pub(crate) db: &'static dyn Data,
|
||||||
|
pub(crate) reload_handles: FilterReloadHandles,
|
||||||
|
|
||||||
// actual_destination, host
|
// actual_destination, host
|
||||||
pub(crate) actual_destination_cache: Arc<RwLock<WellKnownMap>>,
|
pub(crate) actual_destination_cache: Arc<RwLock<WellKnownMap>>,
|
||||||
|
|
@ -173,7 +177,11 @@ impl Resolve for Resolver {
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub(crate) fn load(db: &'static dyn Data, config: Config) -> Result<Self> {
|
pub(crate) fn load(
|
||||||
|
db: &'static dyn Data,
|
||||||
|
config: Config,
|
||||||
|
reload_handles: FilterReloadHandles,
|
||||||
|
) -> Result<Self> {
|
||||||
let keypair = db.load_keypair();
|
let keypair = db.load_keypair();
|
||||||
|
|
||||||
let keypair = match keypair {
|
let keypair = match keypair {
|
||||||
|
|
@ -227,6 +235,7 @@ impl Service {
|
||||||
let mut s = Self {
|
let mut s = Self {
|
||||||
db,
|
db,
|
||||||
config,
|
config,
|
||||||
|
reload_handles,
|
||||||
keypair: Arc::new(keypair),
|
keypair: Arc::new(keypair),
|
||||||
dns_resolver: TokioAsyncResolver::tokio_from_system_conf()
|
dns_resolver: TokioAsyncResolver::tokio_from_system_conf()
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue