//! Facilities for observing runtime behavior #![warn(missing_docs, clippy::missing_docs_in_private_items)] use std::{ collections::HashSet, fs::File, io::BufWriter, sync::{Arc, LazyLock}, }; use axum::{ extract::{MatchedPath, Request}, middleware::Next, response::Response, }; use http::Method; use opentelemetry::{metrics::MeterProvider, trace::TracerProvider, KeyValue}; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::{ metrics::{new_view, Aggregation, Instrument, SdkMeterProvider, Stream}, Resource, }; use strum::{AsRefStr, IntoStaticStr}; use thiserror::Error; use tokio::time::Instant; use tracing::{subscriber::SetGlobalDefaultError, Span}; use tracing_flame::{FlameLayer, FlushGuard}; use tracing_opentelemetry::OtelData; use tracing_subscriber::{ layer::SubscriberExt, registry::{LookupSpan, SpanData}, reload, EnvFilter, Layer, Registry, }; use crate::{ config::{Config, EnvFilterClone, LogFormat}, error, utils::error::Result, }; /// Globally accessible metrics state pub(crate) static METRICS: LazyLock = LazyLock::new(Metrics::new); /// Cleans up resources relating to observability when [`Drop`]ped pub(crate) struct Guard { /// Drop guard used to flush [`tracing_flame`] data on exit #[allow(dead_code)] flame_guard: Option>>, } impl Drop for Guard { fn drop(&mut self) { opentelemetry::global::shutdown_tracer_provider(); } } /// We need to store a [`reload::Handle`] value, but can't name it's type /// explicitly because the S type parameter depends on the subscriber's previous /// layers. In our case, this includes unnameable 'impl Trait' types. /// /// This is fixed[1] in the unreleased tracing-subscriber from the master /// branch, which removes the S parameter. Unfortunately can't use it without /// pulling in a version of tracing that's incompatible with the rest of our /// deps. /// /// To work around this, we define an trait without the S paramter that forwards /// to the [`reload::Handle::reload`] method, and then store the handle as a /// trait object. /// /// [1]: https://github.com/tokio-rs/tracing/pull/1035/commits/8a87ea52425098d3ef8f56d92358c2f6c144a28f pub(crate) trait ReloadHandle { /// Replace the layer with a new value. See [`reload::Handle::reload`]. fn reload(&self, new_value: L) -> Result<(), reload::Error>; } impl ReloadHandle for reload::Handle { fn reload(&self, new_value: L) -> Result<(), reload::Error> { reload::Handle::reload(self, new_value) } } /// Error returned from [`FilterReloadHandle::set_filter()`] #[allow(clippy::missing_docs_in_private_items)] #[derive(Debug, Error)] pub(crate) enum SetFilterError { #[error("invalid filter string")] InvalidFilter(#[from] tracing_subscriber::filter::ParseError), #[error("failed to reload filter layer")] Reload(#[from] reload::Error), } /// A wrapper around a tracing filter [reload handle][reload::Handle] that /// remembers the filter string that was last set. pub(crate) struct FilterReloadHandle { /// The actual [`reload::Handle`] that can be used to modify the filter /// [`Layer`] inner: Box + Send + Sync>, /// Filter string that was last applied to `inner` current_filter: String, /// Filter string that was initially loaded from the configuration initial_filter: String, } impl FilterReloadHandle { /// Creates a new [`FilterReloadHandle`] from a filter string, returning the /// filter layer itself and the handle that can be used to modify it. pub(crate) fn new( filter: EnvFilterClone, ) -> (impl tracing_subscriber::layer::Filter, Self) { let (layer, handle) = reload::Layer::new(EnvFilter::from(&filter)); let handle = Self { inner: Box::new(handle), current_filter: filter.0.clone(), initial_filter: filter.0, }; (layer, handle) } /// Sets the filter string for the linked filter layer. Can fail if the /// filter string is invalid or when the link to the layer has been /// broken. pub(crate) fn set_filter( &mut self, filter: String, ) -> Result<(), SetFilterError> { self.inner.reload(filter.parse()?)?; self.current_filter = filter; Ok(()) } /// Returns the filter string that the underlying filter layer is currently /// configured for. pub(crate) fn get_filter(&self) -> &str { &self.current_filter } /// Returns the filter string that the underlying filter layer was /// initialized with. pub(crate) fn get_initial_filter(&self) -> &str { &self.initial_filter } } /// Collection of [`FilterReloadHandle`]s, allowing the filters for tracing /// backends to be changed dynamically. Handles may be [`None`] if the backend /// is disabled in the config. #[allow(clippy::missing_docs_in_private_items)] pub(crate) struct FilterReloadHandles { pub(crate) traces: Option, pub(crate) flame: Option, pub(crate) log: Option, } /// A kind of data that gets looked up /// /// See also [`Metrics::record_lookup`]. // Keep variants sorted #[allow(clippy::missing_docs_in_private_items)] #[derive(Clone, Copy, AsRefStr, IntoStaticStr)] pub(crate) enum Lookup { AppserviceInRoom, AuthChain, CreateEventIdToShort, CreateStateKeyToShort, FederationDestination, LastTimelineCount, OurRealUsers, Pdu, ShortToEventId, ShortToStateKey, StateInfo, StateKeyToShort, VisibilityForServer, VisibilityForUser, } /// Locations where a [`Lookup`] value may be found /// /// Not all of these variants are used for each value of [`Lookup`]. #[derive(Clone, Copy, AsRefStr, IntoStaticStr)] pub(crate) enum FoundIn { /// Found in cache Cache, /// Cache miss, but it was in the database. The cache has been updated. Database, /// Cache and database miss, but another server had it. The cache has been /// updated. Remote, /// The entry could not be found anywhere. Nothing, } /// Wrapper for the creation of a `tracing` [`Layer`] and any associated opaque /// data. /// /// Returns a no-op `None` layer if `enable` is `false`, otherwise calls the /// given closure to construct the layer and associated data, then applies the /// filter to the layer. fn make_backend( enable: bool, filter: &EnvFilterClone, init: impl FnOnce() -> Result<(L, T), error::Observability>, ) -> Result< (impl Layer, Option, Option), error::Observability, > where L: Layer, S: tracing::Subscriber + for<'span> LookupSpan<'span>, { if !enable { return Ok((None, None, None)); } let (filter, handle) = FilterReloadHandle::new(filter.clone()); let (layer, data) = init()?; Ok((Some(layer.with_filter(filter)), Some(handle), Some(data))) } /// Initialize observability pub(crate) fn init( config: &Config, ) -> Result<(Guard, FilterReloadHandles), error::Observability> { let (traces_layer, traces_filter, _) = make_backend( config.observability.traces.enable, &config.observability.traces.filter, || { opentelemetry::global::set_text_map_propagator( opentelemetry_jaeger_propagator::Propagator::new(), ); let mut exporter = opentelemetry_otlp::new_exporter().tonic(); if let Some(endpoint) = &config.observability.traces.endpoint { exporter = exporter.with_endpoint(endpoint); } let tracer_provider = opentelemetry_otlp::new_pipeline() .tracing() .with_trace_config( opentelemetry_sdk::trace::Config::default().with_resource( standard_resource( config.observability.traces.service_name.clone(), ), ), ) .with_exporter(exporter) .install_batch(opentelemetry_sdk::runtime::Tokio)?; // The passed value sets the library name, and `""` seems to be // morally equivalent to passing `None`, which is probably fine // because what other library is there to use for this anyway? // // Prior to opentelemetry v0.24, this value was set for us by the // opentelemetry-otlp crate. Trying to automate getting the right // values doesn't seem worth it, as alluded to above. let tracer = tracer_provider.tracer(""); opentelemetry::global::set_tracer_provider(tracer_provider); Ok((tracing_opentelemetry::layer().with_tracer(tracer), ())) }, )?; let (flame_layer, flame_filter, flame_guard) = make_backend( config.observability.flame.enable, &config.observability.flame.filter, || { let (flame_layer, guard) = FlameLayer::with_file(&config.observability.flame.filename)?; Ok((flame_layer.with_empty_samples(false), guard)) }, )?; let (log_layer, log_filter, _) = make_backend(true, &config.observability.logs.filter, || { /// Time format selection for `tracing_subscriber` at runtime #[allow(clippy::missing_docs_in_private_items)] enum TimeFormat { SystemTime, NoTime, } impl tracing_subscriber::fmt::time::FormatTime for TimeFormat { fn format_time( &self, w: &mut tracing_subscriber::fmt::format::Writer<'_>, ) -> std::fmt::Result { match self { TimeFormat::SystemTime => { tracing_subscriber::fmt::time::SystemTime .format_time(w) } TimeFormat::NoTime => Ok(()), } } } let fmt_layer = tracing_subscriber::fmt::Layer::new() .with_ansi(config.observability.logs.colors) .with_timer(if config.observability.logs.timestamp { TimeFormat::SystemTime } else { TimeFormat::NoTime }); let fmt_layer = match config.observability.logs.format { LogFormat::Pretty => fmt_layer.pretty().boxed(), LogFormat::Full => fmt_layer.boxed(), LogFormat::Compact => fmt_layer.compact().boxed(), LogFormat::Json => fmt_layer.json().boxed(), }; Ok((fmt_layer, ())) })?; let subscriber = Registry::default() .with(traces_layer) .with(flame_layer) .with(log_layer); tracing::subscriber::set_global_default(subscriber)?; Ok(( Guard { flame_guard, }, FilterReloadHandles { traces: traces_filter, flame: flame_filter, log: log_filter, }, )) } /// Construct the standard [`Resource`] value to use for this service fn standard_resource(service_name: String) -> Resource { Resource::default() .merge(&Resource::new([KeyValue::new("service.name", service_name)])) } /// Holds state relating to metrics pub(crate) struct Metrics { /// Internal state for OpenTelemetry metrics /// /// We never directly read from [`SdkMeterProvider`], but it needs to /// outlive all calls to `self.otel_state.0.gather()`, otherwise /// metrics collection will fail. otel_state: (prometheus::Registry, SdkMeterProvider), /// Histogram of HTTP requests http_requests_histogram: opentelemetry::metrics::Histogram, /// Counts where data is found from lookup: opentelemetry::metrics::Counter, /// Number of entries in an /// [`OnDemandHashMap`](crate::utils::on_demand_hashmap::OnDemandHashMap) on_demand_hashmap_size: opentelemetry::metrics::Gauge, } impl Metrics { /// Initializes metric-collecting and exporting facilities fn new() -> Self { // Metric names let http_requests_histogram_name = "http.requests"; // Set up OpenTelemetry state let registry = prometheus::Registry::new(); let exporter = opentelemetry_prometheus::exporter() .with_registry(registry.clone()) .build() .expect("exporter configuration should be valid"); let provider = SdkMeterProvider::builder() .with_reader(exporter) .with_view( new_view( Instrument::new().name(http_requests_histogram_name), Stream::new().aggregation( Aggregation::ExplicitBucketHistogram { boundaries: vec![ 0., 0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1., 2., 3., 4., 5., 6., 7., 8., 9., 10., 20., 30., 40., 50., ], record_min_max: true, }, ), ) .expect("view should be valid"), ) .with_resource(standard_resource(env!("CARGO_PKG_NAME").to_owned())) .build(); let meter = provider.meter(env!("CARGO_PKG_NAME")); // Define metrics let http_requests_histogram = meter .f64_histogram(http_requests_histogram_name) .with_unit("seconds") .with_description("Histogram of HTTP requests") .init(); let lookup = meter .u64_counter("lookup") .with_description("Counts where data is found from") .init(); let on_demand_hashmap_size = meter .u64_gauge("on_demand_hashmap_size") .with_description("Number of entries in OnDemandHashMap") .init(); Metrics { otel_state: (registry, provider), http_requests_histogram, lookup, on_demand_hashmap_size, } } /// Export metrics to a string suitable for consumption by e.g. Prometheus pub(crate) fn export(&self) -> String { prometheus::TextEncoder::new() .encode_to_string(&self.otel_state.0.gather()) .expect("should be able to encode metrics") } /// Record that some data was found in a particular storage location pub(crate) fn record_lookup(&self, lookup: Lookup, found_in: FoundIn) { self.lookup.add( 1, &[ KeyValue::new("lookup", <&str>::from(lookup)), KeyValue::new("found_in", <&str>::from(found_in)), ], ); } /// Record size of [`OnDemandHashMap`] /// /// [`OnDemandHashMap`]: crate::utils::on_demand_hashmap::OnDemandHashMap pub(crate) fn record_on_demand_hashmap_size( &self, name: Arc, size: usize, ) { self.on_demand_hashmap_size.record( size.try_into().unwrap_or(u64::MAX), &[KeyValue::new("name", name)], ); } } /// Track HTTP metrics by converting this into an [`axum`] layer pub(crate) async fn http_metrics_layer(req: Request, next: Next) -> Response { /// Routes that should not be included in the metrics static IGNORED_ROUTES: LazyLock> = LazyLock::new(|| [(&Method::GET, "/metrics")].into_iter().collect()); let matched_path = req.extensions().get::().map(|x| x.as_str().to_owned()); let method = req.method().to_owned(); match matched_path { // Run the next layer if the route should be ignored Some(matched_path) if IGNORED_ROUTES.contains(&(&method, matched_path.as_str())) => { next.run(req).await } // Run the next layer if the route is unknown None => next.run(req).await, // Otherwise, run the next layer and record metrics Some(matched_path) => { let start = Instant::now(); let resp = next.run(req).await; let elapsed = start.elapsed(); let status_code = resp.status().as_str().to_owned(); let attrs = &[ KeyValue::new("method", method.as_str().to_owned()), KeyValue::new("path", matched_path), KeyValue::new("status_code", status_code), ]; METRICS .http_requests_histogram .record(elapsed.as_secs_f64(), attrs); resp } } } /// Add `traceresponse` header if possible /// /// See also . pub(crate) async fn traceresponse_layer(req: Request, next: Next) -> Response { let mut resp = next.run(req).await; let ids = tracing::dispatcher::get_default(|dispatch| { Span::current() .id() .and_then(|id| { dispatch .downcast_ref::() .and_then(|x| x.span_data(&id)) }) .and_then(|x| { x.extensions() .get::() .and_then(|x| x.builder.trace_id.zip(x.builder.span_id)) }) }); if let Some((trace_id, span_id)) = ids { let headers = resp.headers_mut(); headers.insert( "traceresponse", format!( "{:02x}-{}-{}-{:02x}", 0, trace_id, span_id, // Doesn't seem to be possible to get the SpanContext here, but // this should be a fine default value 0, ) .try_into() .expect("traceresponse value should be a valid header value"), ); } resp } /// Set up observability for CLI-oriented subcommands. /// /// Tracing spans and events will be sent to `stderr`. pub(crate) fn init_for_cli( log_format: LogFormat, env_filter: EnvFilter, ) -> Result<(), SetGlobalDefaultError> { let log_layer = tracing_subscriber::fmt::Layer::new().with_writer(std::io::stderr); let log_layer = match log_format { LogFormat::Pretty => log_layer.pretty().boxed(), LogFormat::Full => log_layer.boxed(), LogFormat::Compact => log_layer.compact().boxed(), LogFormat::Json => log_layer.json().boxed(), }; let log_layer = log_layer.with_filter(env_filter); let subscriber = Registry::default().with(log_layer); tracing::subscriber::set_global_default(subscriber).map_err(Into::into) }