factor observability things into its own module

And also create a new error for the observability initialization.
This commit is contained in:
Charles Hall 2024-05-23 13:39:40 -07:00
parent a275543494
commit c17ab5328d
No known key found for this signature in database
GPG key ID: 7B8E0645816E07CF
3 changed files with 81 additions and 56 deletions

View file

@ -48,6 +48,22 @@ pub(crate) enum Main {
#[error("invalid configuration")] #[error("invalid configuration")]
ConfigInvalid(#[from] figment::Error), ConfigInvalid(#[from] figment::Error),
#[error("failed to initialize observability")]
Observability(#[from] Observability),
#[error("failed to load or create the database")]
DatabaseError(#[source] crate::utils::error::Error),
#[error("failed to serve requests")]
Serve(#[source] std::io::Error),
}
/// Observability initialization errors
// Missing docs are allowed here since that kind of information should be
// encoded in the error messages themselves anyway.
#[allow(missing_docs)]
#[derive(Error, Debug)]
pub(crate) enum Observability {
// Upstream's documentation on what this error means is very sparse // Upstream's documentation on what this error means is very sparse
#[error("opentelemetry error")] #[error("opentelemetry error")]
Otel(#[from] opentelemetry::trace::TraceError), Otel(#[from] opentelemetry::trace::TraceError),
@ -61,10 +77,4 @@ pub(crate) enum Main {
// Upstream's documentation on what this error means is very sparse // Upstream's documentation on what this error means is very sparse
#[error("tracing_flame error")] #[error("tracing_flame error")]
TracingFlame(#[from] tracing_flame::Error), TracingFlame(#[from] tracing_flame::Error),
#[error("failed to load or create the database")]
DatabaseError(#[source] crate::utils::error::Error),
#[error("failed to serve requests")]
Serve(#[source] std::io::Error),
} }

View file

@ -24,8 +24,6 @@ use http::{
header::{self, HeaderName}, header::{self, HeaderName},
Method, StatusCode, Uri, Method, StatusCode, Uri,
}; };
use opentelemetry::KeyValue;
use opentelemetry_sdk::Resource;
use ruma::api::{ use ruma::api::{
client::{ client::{
error::{Error as RumaError, ErrorBody, ErrorKind}, error::{Error as RumaError, ErrorBody, ErrorKind},
@ -41,13 +39,13 @@ use tower_http::{
ServiceBuilderExt as _, ServiceBuilderExt as _,
}; };
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use tracing_subscriber::{prelude::*, EnvFilter};
pub(crate) mod api; pub(crate) mod api;
pub(crate) mod clap; pub(crate) mod clap;
mod config; mod config;
mod database; mod database;
mod error; mod error;
mod observability;
mod service; mod service;
mod utils; mod utils;
@ -127,48 +125,7 @@ async fn try_main() -> Result<(), error::Main> {
config.warn_deprecated(); config.warn_deprecated();
if config.allow_jaeger { let _guard = observability::init(&config);
opentelemetry::global::set_text_map_propagator(
opentelemetry_jaeger_propagator::Propagator::new(),
);
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
opentelemetry_sdk::trace::config().with_resource(
Resource::new(vec![KeyValue::new(
"service.name",
env!("CARGO_PKG_NAME"),
)]),
),
)
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let filter_layer = EnvFilter::try_new(&config.log)?;
let subscriber = tracing_subscriber::Registry::default()
.with(filter_layer)
.with(telemetry);
tracing::subscriber::set_global_default(subscriber)?;
} else if config.tracing_flame {
let registry = tracing_subscriber::Registry::default();
let (flame_layer, _guard) =
tracing_flame::FlameLayer::with_file("./tracing.folded")?;
let flame_layer = flame_layer.with_empty_samples(false);
let filter_layer = EnvFilter::new("trace,h2=off");
let subscriber = registry.with(filter_layer).with(flame_layer);
tracing::subscriber::set_global_default(subscriber)?;
} else {
let registry = tracing_subscriber::Registry::default();
let fmt_layer = tracing_subscriber::fmt::Layer::new();
let filter_layer = EnvFilter::try_new(&config.log)?;
let subscriber = registry.with(filter_layer).with(fmt_layer);
tracing::subscriber::set_global_default(subscriber)?;
}
// This is needed for opening lots of file descriptors, which tends to // This is needed for opening lots of file descriptors, which tends to
// happen more often when using RocksDB and making lots of federation // happen more often when using RocksDB and making lots of federation
@ -185,15 +142,10 @@ async fn try_main() -> Result<(), error::Main> {
KeyValueDatabase::load_or_create(config) KeyValueDatabase::load_or_create(config)
.await .await
.map_err(Error::DatabaseError)?; .map_err(Error::DatabaseError)?;
let config = &services().globals.config;
info!("Starting server"); info!("Starting server");
run_server().await.map_err(Error::Serve)?; run_server().await.map_err(Error::Serve)?;
if config.allow_jaeger {
opentelemetry::global::shutdown_tracer_provider();
}
Ok(()) Ok(())
} }

63
src/observability.rs Normal file
View file

@ -0,0 +1,63 @@
//! Facilities for observing runtime behavior
#![warn(missing_docs, clippy::missing_docs_in_private_items)]
use opentelemetry::KeyValue;
use opentelemetry_sdk::Resource;
use tracing_flame::FlameLayer;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Registry};
use crate::{config::Config, error, utils::error::Result};
/// Cleans up resources relating to observability when [`Drop`]ped
pub(crate) struct Guard;
impl Drop for Guard {
fn drop(&mut self) {
opentelemetry::global::shutdown_tracer_provider();
}
}
/// Initialize observability
pub(crate) fn init(config: &Config) -> Result<Guard, error::Observability> {
if config.allow_jaeger {
opentelemetry::global::set_text_map_propagator(
opentelemetry_jaeger_propagator::Propagator::new(),
);
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
opentelemetry_sdk::trace::config().with_resource(
Resource::new(vec![KeyValue::new(
"service.name",
env!("CARGO_PKG_NAME"),
)]),
),
)
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let filter_layer = EnvFilter::try_new(&config.log)?;
let subscriber = Registry::default().with(filter_layer).with(telemetry);
tracing::subscriber::set_global_default(subscriber)?;
} else if config.tracing_flame {
let registry = Registry::default();
let (flame_layer, _guard) = FlameLayer::with_file("./tracing.folded")?;
let flame_layer = flame_layer.with_empty_samples(false);
let filter_layer = EnvFilter::new("trace,h2=off");
let subscriber = registry.with(filter_layer).with(flame_layer);
tracing::subscriber::set_global_default(subscriber)?;
} else {
let registry = Registry::default();
let fmt_layer = tracing_subscriber::fmt::Layer::new();
let filter_layer = EnvFilter::try_new(&config.log)?;
let subscriber = registry.with(filter_layer).with(fmt_layer);
tracing::subscriber::set_global_default(subscriber)?;
}
Ok(Guard)
}