diff --git a/src/cli.rs b/src/cli.rs index c997ad67..c61fb72e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -7,8 +7,9 @@ use std::path::PathBuf; use clap::{Parser, Subcommand}; -use crate::error; +use crate::{config::DatabaseBackend, error}; +mod db; mod serve; /// Command line arguments @@ -26,6 +27,10 @@ pub(crate) struct Args { pub(crate) enum Command { /// Run the server. Serve(ServeArgs), + + /// Database utilities + #[clap(subcommand)] + Db(DbCommand), } /// Wrapper for the `--config` arg. @@ -57,10 +62,37 @@ pub(crate) struct ServeArgs { pub(crate) config: ConfigArg, } +#[derive(Subcommand)] +pub(crate) enum DbCommand { + /// Convert between database backends + /// + /// Once this command successfully completes, copy or move the `media` + /// directory from `IN_PATH` to `OUT_PATH` to complete the migration. + Convert(DbConvert), +} + +#[derive(clap::Args)] +pub(crate) struct DbConvert { + /// The backend to convert from + in_backend: DatabaseBackend, + + /// The backend to convert to + out_backend: DatabaseBackend, + + /// Path to the database to read + in_path: PathBuf, + + /// Path to write the new database + out_path: PathBuf, +} + impl Args { pub(crate) async fn run(self) -> Result<(), error::Main> { match self.command { Command::Serve(args) => serve::run(args).await?, + Command::Db(DbCommand::Convert(args)) => { + db::convert::run(args).await? + } } Ok(()) } diff --git a/src/cli/db.rs b/src/cli/db.rs new file mode 100644 index 00000000..02d63d4e --- /dev/null +++ b/src/cli/db.rs @@ -0,0 +1 @@ +pub(crate) mod convert; diff --git a/src/cli/db/convert.rs b/src/cli/db/convert.rs new file mode 100644 index 00000000..7af1e61c --- /dev/null +++ b/src/cli/db/convert.rs @@ -0,0 +1,6 @@ +use crate::{cli::DbConvert, error}; + +pub(crate) async fn run(args: DbConvert) -> Result<(), error::DbConvert> { + println!("hello world"); + Ok(()) +} diff --git a/src/cli/serve.rs b/src/cli/serve.rs index 0991066b..ae1e67f6 100644 --- a/src/cli/serve.rs +++ b/src/cli/serve.rs @@ -1,6 +1,6 @@ use std::{ - collections::HashSet, future::Future, net::SocketAddr, sync::atomic, - time::Duration, + collections::HashSet, future::Future, net::SocketAddr, path::Path, + sync::atomic, time::Duration, }; use axum::{ @@ -76,8 +76,16 @@ pub(crate) async fn run(args: ServeArgs) -> Result<(), error::ServeCommand> { info!("Loading database"); let db = Box::leak(Box::new( - KeyValueDatabase::load_or_create(&config) - .map_err(Error::DatabaseError)?, + KeyValueDatabase::load_or_create( + config.database.backend, + Path::new(&config.database.path), + config.database.cache_capacity_mb, + config.database.rocksdb_max_open_files, + config.pdu_cache_capacity, + config.cache_capacity_modifier, + config.conduit_compat, + ) + .map_err(Error::DatabaseError)?, )); Services::build(db, config, reload_handles) diff --git a/src/config.rs b/src/config.rs index c980c26b..e7f1d9a1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -216,7 +216,7 @@ impl Default for TurnConfig { } } -#[derive(Clone, Copy, Debug, Deserialize)] +#[derive(Clone, Copy, Debug, Deserialize, clap::ValueEnum)] #[serde(rename_all = "lowercase")] pub(crate) enum DatabaseBackend { #[cfg(feature = "rocksdb")] @@ -242,7 +242,7 @@ pub(crate) struct DatabaseConfig { pub(crate) path: String, #[serde(default = "default_db_cache_capacity_mb")] pub(crate) cache_capacity_mb: f64, - #[cfg(feature = "rocksdb")] + #[cfg_attr(not(feature = "rocksdb"), allow(dead_code))] #[serde(default = "default_rocksdb_max_open_files")] pub(crate) rocksdb_max_open_files: i32, } @@ -385,7 +385,7 @@ fn default_cache_capacity_modifier() -> f64 { 1.0 } -#[cfg(feature = "rocksdb")] +#[cfg_attr(not(feature = "rocksdb"), allow(dead_code))] fn default_rocksdb_max_open_files() -> i32 { 1000 } diff --git a/src/database.rs b/src/database.rs index 641f1f7c..41275b56 100644 --- a/src/database.rs +++ b/src/database.rs @@ -28,7 +28,7 @@ use crate::{ timeline::PduCount, }, }, - services, utils, Config, Error, PduEvent, Result, + services, utils, Error, PduEvent, Result, }; pub(crate) mod abstraction; @@ -257,13 +257,15 @@ pub(crate) struct KeyValueDatabase { } impl KeyValueDatabase { - fn check_db_setup(config: &Config) -> Result<()> { - let path = Path::new(&config.database.path); - + fn check_db_setup( + backend: DatabaseBackend, + path: &Path, + conduit_compat: bool, + ) -> Result<()> { let sqlite_exists = path .join(format!( "{}.db", - if config.conduit_compat { + if conduit_compat { "conduit" } else { "grapevine" @@ -288,7 +290,7 @@ impl KeyValueDatabase { } let (backend_is_rocksdb, backend_is_sqlite): (bool, bool) = - match config.database.backend { + match backend { #[cfg(feature = "rocksdb")] DatabaseBackend::Rocksdb => (true, false), #[cfg(feature = "sqlite")] @@ -312,18 +314,22 @@ impl KeyValueDatabase { Ok(()) } - /// Load an existing database or create a new one, and initialize all - /// services with the loaded database. - #[cfg_attr( - not(any(feature = "rocksdb", feature = "sqlite")), - allow(unreachable_code) - )] - #[allow(clippy::too_many_lines)] - pub(crate) fn load_or_create(config: &Config) -> Result { - Self::check_db_setup(config)?; + pub(crate) fn load_or_create_engine( + backend: DatabaseBackend, + path: &Path, + cache_capacity_mb: f64, + rocksdb_max_open_files: i32, + conduit_compat: bool, + ) -> Result> { + #[cfg(not(any(feature = "rocksdb", feature = "sqlite")))] + return Err(Error::BadConfig( + "Compiled without support for any databases", + )); - if !Path::new(&config.database.path).exists() { - fs::create_dir_all(&config.database.path).map_err(|_| { + Self::check_db_setup(backend, path, conduit_compat)?; + + if !Path::new(path).exists() { + fs::create_dir_all(path).map_err(|_| { Error::BadConfig( "Database folder doesn't exists and couldn't be created \ (e.g. due to missing permissions). Please create the \ @@ -332,21 +338,50 @@ impl KeyValueDatabase { })?; } - #[cfg_attr( - not(any(feature = "rocksdb", feature = "sqlite")), - allow(unused_variables) - )] - let builder: Arc = - match config.database.backend { - #[cfg(feature = "sqlite")] - DatabaseBackend::Sqlite => { - Arc::new(Arc::::open(config)?) - } - #[cfg(feature = "rocksdb")] - DatabaseBackend::Rocksdb => { - Arc::new(Arc::::open(config)?) - } - }; + let x: Arc = match backend { + #[cfg(feature = "sqlite")] + DatabaseBackend::Sqlite => { + Arc::new(Arc::new(abstraction::sqlite::Engine::open( + path, + cache_capacity_mb, + conduit_compat, + )?)) + } + #[cfg(feature = "rocksdb")] + DatabaseBackend::Rocksdb => { + Arc::new(Arc::new(abstraction::rocksdb::Engine::open( + path, + cache_capacity_mb, + rocksdb_max_open_files, + )?)) + } + }; + + #[cfg(any(feature = "rocksdb", feature = "sqlite"))] + return Ok(x); + } + + #[cfg_attr( + not(any(feature = "rocksdb", feature = "sqlite")), + allow(unreachable_code) + )] + #[allow(clippy::too_many_lines)] + pub(crate) fn load_or_create( + backend: DatabaseBackend, + path: &Path, + cache_capacity_mb: f64, + rocksdb_max_open_files: i32, + pdu_cache_capacity: u32, + cache_capacity_modifier: f64, + conduit_compat: bool, + ) -> Result { + let builder = Self::load_or_create_engine( + backend, + path, + cache_capacity_mb, + rocksdb_max_open_files, + conduit_compat, + )?; let db = Self { db: builder.clone(), @@ -474,8 +509,7 @@ impl KeyValueDatabase { server_signingkeys: builder.open_tree("server_signingkeys")?, pdu_cache: Mutex::new(LruCache::new( - config - .pdu_cache_capacity + pdu_cache_capacity .try_into() .expect("pdu cache capacity fits into usize"), )), @@ -485,7 +519,7 @@ impl KeyValueDatabase { clippy::cast_possible_truncation )] auth_chain_cache: Mutex::new(LruCache::new( - (100_000.0 * config.cache_capacity_modifier) as usize, + (100_000.0 * cache_capacity_modifier) as usize, )), #[allow( clippy::as_conversions, @@ -493,7 +527,7 @@ impl KeyValueDatabase { clippy::cast_possible_truncation )] shorteventid_cache: Mutex::new(LruCache::new( - (100_000.0 * config.cache_capacity_modifier) as usize, + (100_000.0 * cache_capacity_modifier) as usize, )), #[allow( clippy::as_conversions, @@ -501,7 +535,7 @@ impl KeyValueDatabase { clippy::cast_possible_truncation )] eventidshort_cache: Mutex::new(LruCache::new( - (100_000.0 * config.cache_capacity_modifier) as usize, + (100_000.0 * cache_capacity_modifier) as usize, )), #[allow( clippy::as_conversions, @@ -509,7 +543,7 @@ impl KeyValueDatabase { clippy::cast_possible_truncation )] shortstatekey_cache: Mutex::new(LruCache::new( - (100_000.0 * config.cache_capacity_modifier) as usize, + (100_000.0 * cache_capacity_modifier) as usize, )), #[allow( clippy::as_conversions, @@ -517,7 +551,7 @@ impl KeyValueDatabase { clippy::cast_possible_truncation )] statekeyshort_cache: Mutex::new(LruCache::new( - (100_000.0 * config.cache_capacity_modifier) as usize, + (100_000.0 * cache_capacity_modifier) as usize, )), our_real_users_cache: RwLock::new(HashMap::new()), appservice_in_room_cache: RwLock::new(HashMap::new()), diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index db14c728..37fa214e 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -12,10 +12,6 @@ pub(crate) mod rocksdb; pub(crate) mod watchers; pub(crate) trait KeyValueDatabaseEngine: Send + Sync { - #[cfg(any(feature = "sqlite", feature = "rocksdb"))] - fn open(config: &super::Config) -> Result - where - Self: Sized; fn open_tree(&self, name: &'static str) -> Result>; fn cleanup(&self) -> Result<()> { Ok(()) diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index 72e676a1..9ddeaeca 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -1,6 +1,7 @@ use std::{ collections::HashSet, future::Future, + path::Path, pin::Pin, sync::{Arc, Mutex, RwLock}, }; @@ -13,9 +14,7 @@ use rocksdb::{ }; use tracing::Level; -use super::{ - super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree, -}; +use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree}; use crate::{utils, Result}; pub(crate) struct Engine { @@ -70,50 +69,49 @@ fn db_options(max_open_files: i32, rocksdb_cache: &Cache) -> Options { db_opts } -impl KeyValueDatabaseEngine for Arc { - fn open(config: &Config) -> Result { +impl Engine { + pub(crate) fn open( + path: &Path, + cache_capacity_mb: f64, + max_open_files: i32, + ) -> Result { #[allow( clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation )] let cache_capacity_bytes = - (config.database.cache_capacity_mb * 1024.0 * 1024.0) as usize; + (cache_capacity_mb * 1024.0 * 1024.0) as usize; let rocksdb_cache = Cache::new_lru_cache(cache_capacity_bytes); - let db_opts = - db_options(config.database.rocksdb_max_open_files, &rocksdb_cache); + let db_opts = db_options(max_open_files, &rocksdb_cache); - let cfs = DBWithThreadMode::::list_cf( - &db_opts, - &config.database.path, - ) - .map(|x| x.into_iter().collect::>()) - .unwrap_or_default(); + let cfs = DBWithThreadMode::::list_cf(&db_opts, path) + .map(|x| x.into_iter().collect::>()) + .unwrap_or_default(); let db = DBWithThreadMode::::open_cf_descriptors( &db_opts, - &config.database.path, + path, cfs.iter().map(|name| { ColumnFamilyDescriptor::new( name, - db_options( - config.database.rocksdb_max_open_files, - &rocksdb_cache, - ), + db_options(max_open_files, &rocksdb_cache), ) }), )?; - Ok(Arc::new(Engine { + Ok(Engine { rocks: db, - max_open_files: config.database.rocksdb_max_open_files, + max_open_files, cache: rocksdb_cache, old_cfs: cfs, new_cfs: Mutex::default(), - })) + }) } +} +impl KeyValueDatabaseEngine for Arc { fn open_tree(&self, name: &'static str) -> Result> { let mut new_cfs = self.new_cfs.lock().expect("lock should not be poisoned"); diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 7b41fc79..af62d7aa 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -12,7 +12,7 @@ use thread_local::ThreadLocal; use tracing::debug; use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree}; -use crate::{database::Config, Result}; +use crate::Result; thread_local! { static READ_CONNECTION: RefCell> = @@ -66,6 +66,47 @@ pub(crate) struct Engine { } impl Engine { + pub(crate) fn open( + path: &Path, + cache_capacity_mb: f64, + conduit_compat: bool, + ) -> Result { + let path = path.join(format!( + "{}.db", + if conduit_compat { + "conduit" + } else { + "grapevine" + } + )); + + // calculates cache-size per permanent connection + // 1. convert MB to KiB + // 2. divide by permanent connections + permanent iter connections + + // write connection + // 3. round down to nearest integer + #[allow( + clippy::as_conversions, + clippy::cast_possible_truncation, + clippy::cast_precision_loss, + clippy::cast_sign_loss + )] + let cache_size_per_thread = ((cache_capacity_mb * 1024.0) + / ((num_cpus::get() as f64 * 2.0) + 1.0)) + as u32; + + let writer = + Mutex::new(Engine::prepare_conn(&path, cache_size_per_thread)?); + + Ok(Engine { + writer, + read_conn_tls: ThreadLocal::new(), + read_iterator_conn_tls: ThreadLocal::new(), + path, + cache_size_per_thread, + }) + } + fn prepare_conn(path: &Path, cache_size_kb: u32) -> Result { let conn = Connection::open(path)?; @@ -109,45 +150,6 @@ impl Engine { } impl KeyValueDatabaseEngine for Arc { - fn open(config: &Config) -> Result { - let path = Path::new(&config.database.path).join(format!( - "{}.db", - if config.conduit_compat { - "conduit" - } else { - "grapevine" - } - )); - - // calculates cache-size per permanent connection - // 1. convert MB to KiB - // 2. divide by permanent connections + permanent iter connections + - // write connection - // 3. round down to nearest integer - #[allow( - clippy::as_conversions, - clippy::cast_possible_truncation, - clippy::cast_precision_loss, - clippy::cast_sign_loss - )] - let cache_size_per_thread = - ((config.database.cache_capacity_mb * 1024.0) - / ((num_cpus::get() as f64 * 2.0) + 1.0)) as u32; - - let writer = - Mutex::new(Engine::prepare_conn(&path, cache_size_per_thread)?); - - let arc = Arc::new(Engine { - writer, - read_conn_tls: ThreadLocal::new(), - read_iterator_conn_tls: ThreadLocal::new(), - path, - cache_size_per_thread, - }); - - Ok(arc) - } - fn open_tree(&self, name: &str) -> Result> { self.write_lock().execute( &format!( diff --git a/src/error.rs b/src/error.rs index 72bcd337..0fb4be1a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -42,6 +42,8 @@ impl fmt::Display for DisplayWithSources<'_> { pub(crate) enum Main { #[error(transparent)] ServeCommand(#[from] ServeCommand), + #[error(transparent)] + DbConvert(#[from] DbConvert), } /// Errors returned from the `serve` CLI subcommand. @@ -167,3 +169,6 @@ pub(crate) enum Serve { )] FederationSelfTestFailed(#[source] crate::Error), } + +/// Errors converting between database backends +pub(crate) type DbConvert = Box; diff --git a/src/service/globals.rs b/src/service/globals.rs index 1026135c..fc292bd8 100644 --- a/src/service/globals.rs +++ b/src/service/globals.rs @@ -299,7 +299,12 @@ impl Service { if !s.supported_room_versions().contains(&s.config.default_room_version) { - error!(config=?s.config.default_room_version, fallback=?crate::config::default_default_room_version(), "Room version in config isn't supported, falling back to default version"); + error!( + config=?s.config.default_room_version, + fallback=?crate::config::default_default_room_version(), + "Room version in config isn't supported, falling back to \ + default version", + ); s.config.default_room_version = crate::config::default_default_room_version(); }; @@ -610,9 +615,7 @@ impl Service { } pub(crate) fn get_media_file(&self, key: &MediaFileKey) -> PathBuf { - let mut r = PathBuf::new(); - r.push(self.config.database.path.clone()); - r.push("media"); + let mut r = self.get_media_folder(); r.push(general_purpose::URL_SAFE_NO_PAD.encode(key.as_bytes())); r }