Merge branch 'charles/db-convert' into 'main'

Draft: Database backend conversion tool

See merge request matrix/grapevine!116
This commit is contained in:
Charles Hall 2024-10-15 16:15:41 +00:00
commit c956e8f999
11 changed files with 202 additions and 117 deletions

View file

@ -7,8 +7,9 @@ use std::path::PathBuf;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use crate::error; use crate::{config::DatabaseBackend, error};
mod db;
mod serve; mod serve;
/// Command line arguments /// Command line arguments
@ -26,6 +27,10 @@ pub(crate) struct Args {
pub(crate) enum Command { pub(crate) enum Command {
/// Run the server. /// Run the server.
Serve(ServeArgs), Serve(ServeArgs),
/// Database utilities
#[clap(subcommand)]
Db(DbCommand),
} }
/// Wrapper for the `--config` arg. /// Wrapper for the `--config` arg.
@ -57,10 +62,37 @@ pub(crate) struct ServeArgs {
pub(crate) config: ConfigArg, pub(crate) config: ConfigArg,
} }
#[derive(Subcommand)]
pub(crate) enum DbCommand {
/// Convert between database backends
///
/// Once this command successfully completes, copy or move the `media`
/// directory from `IN_PATH` to `OUT_PATH` to complete the migration.
Convert(DbConvert),
}
#[derive(clap::Args)]
pub(crate) struct DbConvert {
/// The backend to convert from
in_backend: DatabaseBackend,
/// The backend to convert to
out_backend: DatabaseBackend,
/// Path to the database to read
in_path: PathBuf,
/// Path to write the new database
out_path: PathBuf,
}
impl Args { impl Args {
pub(crate) async fn run(self) -> Result<(), error::Main> { pub(crate) async fn run(self) -> Result<(), error::Main> {
match self.command { match self.command {
Command::Serve(args) => serve::run(args).await?, Command::Serve(args) => serve::run(args).await?,
Command::Db(DbCommand::Convert(args)) => {
db::convert::run(args).await?
}
} }
Ok(()) Ok(())
} }

1
src/cli/db.rs Normal file
View file

@ -0,0 +1 @@
pub(crate) mod convert;

6
src/cli/db/convert.rs Normal file
View file

@ -0,0 +1,6 @@
use crate::{cli::DbConvert, error};
pub(crate) async fn run(args: DbConvert) -> Result<(), error::DbConvert> {
println!("hello world");
Ok(())
}

View file

@ -1,6 +1,6 @@
use std::{ use std::{
collections::HashSet, future::Future, net::SocketAddr, sync::atomic, collections::HashSet, future::Future, net::SocketAddr, path::Path,
time::Duration, sync::atomic, time::Duration,
}; };
use axum::{ use axum::{
@ -76,8 +76,16 @@ pub(crate) async fn run(args: ServeArgs) -> Result<(), error::ServeCommand> {
info!("Loading database"); info!("Loading database");
let db = Box::leak(Box::new( let db = Box::leak(Box::new(
KeyValueDatabase::load_or_create(&config) KeyValueDatabase::load_or_create(
.map_err(Error::DatabaseError)?, config.database.backend,
Path::new(&config.database.path),
config.database.cache_capacity_mb,
config.database.rocksdb_max_open_files,
config.pdu_cache_capacity,
config.cache_capacity_modifier,
config.conduit_compat,
)
.map_err(Error::DatabaseError)?,
)); ));
Services::build(db, config, reload_handles) Services::build(db, config, reload_handles)

View file

@ -216,7 +216,7 @@ impl Default for TurnConfig {
} }
} }
#[derive(Clone, Copy, Debug, Deserialize)] #[derive(Clone, Copy, Debug, Deserialize, clap::ValueEnum)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub(crate) enum DatabaseBackend { pub(crate) enum DatabaseBackend {
#[cfg(feature = "rocksdb")] #[cfg(feature = "rocksdb")]
@ -242,7 +242,7 @@ pub(crate) struct DatabaseConfig {
pub(crate) path: String, pub(crate) path: String,
#[serde(default = "default_db_cache_capacity_mb")] #[serde(default = "default_db_cache_capacity_mb")]
pub(crate) cache_capacity_mb: f64, pub(crate) cache_capacity_mb: f64,
#[cfg(feature = "rocksdb")] #[cfg_attr(not(feature = "rocksdb"), allow(dead_code))]
#[serde(default = "default_rocksdb_max_open_files")] #[serde(default = "default_rocksdb_max_open_files")]
pub(crate) rocksdb_max_open_files: i32, pub(crate) rocksdb_max_open_files: i32,
} }
@ -385,7 +385,7 @@ fn default_cache_capacity_modifier() -> f64 {
1.0 1.0
} }
#[cfg(feature = "rocksdb")] #[cfg_attr(not(feature = "rocksdb"), allow(dead_code))]
fn default_rocksdb_max_open_files() -> i32 { fn default_rocksdb_max_open_files() -> i32 {
1000 1000
} }

View file

@ -28,7 +28,7 @@ use crate::{
timeline::PduCount, timeline::PduCount,
}, },
}, },
services, utils, Config, Error, PduEvent, Result, services, utils, Error, PduEvent, Result,
}; };
pub(crate) mod abstraction; pub(crate) mod abstraction;
@ -257,13 +257,15 @@ pub(crate) struct KeyValueDatabase {
} }
impl KeyValueDatabase { impl KeyValueDatabase {
fn check_db_setup(config: &Config) -> Result<()> { fn check_db_setup(
let path = Path::new(&config.database.path); backend: DatabaseBackend,
path: &Path,
conduit_compat: bool,
) -> Result<()> {
let sqlite_exists = path let sqlite_exists = path
.join(format!( .join(format!(
"{}.db", "{}.db",
if config.conduit_compat { if conduit_compat {
"conduit" "conduit"
} else { } else {
"grapevine" "grapevine"
@ -288,7 +290,7 @@ impl KeyValueDatabase {
} }
let (backend_is_rocksdb, backend_is_sqlite): (bool, bool) = let (backend_is_rocksdb, backend_is_sqlite): (bool, bool) =
match config.database.backend { match backend {
#[cfg(feature = "rocksdb")] #[cfg(feature = "rocksdb")]
DatabaseBackend::Rocksdb => (true, false), DatabaseBackend::Rocksdb => (true, false),
#[cfg(feature = "sqlite")] #[cfg(feature = "sqlite")]
@ -312,18 +314,22 @@ impl KeyValueDatabase {
Ok(()) Ok(())
} }
/// Load an existing database or create a new one, and initialize all pub(crate) fn load_or_create_engine(
/// services with the loaded database. backend: DatabaseBackend,
#[cfg_attr( path: &Path,
not(any(feature = "rocksdb", feature = "sqlite")), cache_capacity_mb: f64,
allow(unreachable_code) rocksdb_max_open_files: i32,
)] conduit_compat: bool,
#[allow(clippy::too_many_lines)] ) -> Result<Arc<dyn KeyValueDatabaseEngine>> {
pub(crate) fn load_or_create(config: &Config) -> Result<KeyValueDatabase> { #[cfg(not(any(feature = "rocksdb", feature = "sqlite")))]
Self::check_db_setup(config)?; return Err(Error::BadConfig(
"Compiled without support for any databases",
));
if !Path::new(&config.database.path).exists() { Self::check_db_setup(backend, path, conduit_compat)?;
fs::create_dir_all(&config.database.path).map_err(|_| {
if !Path::new(path).exists() {
fs::create_dir_all(path).map_err(|_| {
Error::BadConfig( Error::BadConfig(
"Database folder doesn't exists and couldn't be created \ "Database folder doesn't exists and couldn't be created \
(e.g. due to missing permissions). Please create the \ (e.g. due to missing permissions). Please create the \
@ -332,21 +338,50 @@ impl KeyValueDatabase {
})?; })?;
} }
#[cfg_attr( let x: Arc<dyn KeyValueDatabaseEngine> = match backend {
not(any(feature = "rocksdb", feature = "sqlite")), #[cfg(feature = "sqlite")]
allow(unused_variables) DatabaseBackend::Sqlite => {
)] Arc::new(Arc::new(abstraction::sqlite::Engine::open(
let builder: Arc<dyn KeyValueDatabaseEngine> = path,
match config.database.backend { cache_capacity_mb,
#[cfg(feature = "sqlite")] conduit_compat,
DatabaseBackend::Sqlite => { )?))
Arc::new(Arc::<abstraction::sqlite::Engine>::open(config)?) }
} #[cfg(feature = "rocksdb")]
#[cfg(feature = "rocksdb")] DatabaseBackend::Rocksdb => {
DatabaseBackend::Rocksdb => { Arc::new(Arc::new(abstraction::rocksdb::Engine::open(
Arc::new(Arc::<abstraction::rocksdb::Engine>::open(config)?) path,
} cache_capacity_mb,
}; rocksdb_max_open_files,
)?))
}
};
#[cfg(any(feature = "rocksdb", feature = "sqlite"))]
return Ok(x);
}
#[cfg_attr(
not(any(feature = "rocksdb", feature = "sqlite")),
allow(unreachable_code)
)]
#[allow(clippy::too_many_lines)]
pub(crate) fn load_or_create(
backend: DatabaseBackend,
path: &Path,
cache_capacity_mb: f64,
rocksdb_max_open_files: i32,
pdu_cache_capacity: u32,
cache_capacity_modifier: f64,
conduit_compat: bool,
) -> Result<KeyValueDatabase> {
let builder = Self::load_or_create_engine(
backend,
path,
cache_capacity_mb,
rocksdb_max_open_files,
conduit_compat,
)?;
let db = Self { let db = Self {
db: builder.clone(), db: builder.clone(),
@ -474,8 +509,7 @@ impl KeyValueDatabase {
server_signingkeys: builder.open_tree("server_signingkeys")?, server_signingkeys: builder.open_tree("server_signingkeys")?,
pdu_cache: Mutex::new(LruCache::new( pdu_cache: Mutex::new(LruCache::new(
config pdu_cache_capacity
.pdu_cache_capacity
.try_into() .try_into()
.expect("pdu cache capacity fits into usize"), .expect("pdu cache capacity fits into usize"),
)), )),
@ -485,7 +519,7 @@ impl KeyValueDatabase {
clippy::cast_possible_truncation clippy::cast_possible_truncation
)] )]
auth_chain_cache: Mutex::new(LruCache::new( auth_chain_cache: Mutex::new(LruCache::new(
(100_000.0 * config.cache_capacity_modifier) as usize, (100_000.0 * cache_capacity_modifier) as usize,
)), )),
#[allow( #[allow(
clippy::as_conversions, clippy::as_conversions,
@ -493,7 +527,7 @@ impl KeyValueDatabase {
clippy::cast_possible_truncation clippy::cast_possible_truncation
)] )]
shorteventid_cache: Mutex::new(LruCache::new( shorteventid_cache: Mutex::new(LruCache::new(
(100_000.0 * config.cache_capacity_modifier) as usize, (100_000.0 * cache_capacity_modifier) as usize,
)), )),
#[allow( #[allow(
clippy::as_conversions, clippy::as_conversions,
@ -501,7 +535,7 @@ impl KeyValueDatabase {
clippy::cast_possible_truncation clippy::cast_possible_truncation
)] )]
eventidshort_cache: Mutex::new(LruCache::new( eventidshort_cache: Mutex::new(LruCache::new(
(100_000.0 * config.cache_capacity_modifier) as usize, (100_000.0 * cache_capacity_modifier) as usize,
)), )),
#[allow( #[allow(
clippy::as_conversions, clippy::as_conversions,
@ -509,7 +543,7 @@ impl KeyValueDatabase {
clippy::cast_possible_truncation clippy::cast_possible_truncation
)] )]
shortstatekey_cache: Mutex::new(LruCache::new( shortstatekey_cache: Mutex::new(LruCache::new(
(100_000.0 * config.cache_capacity_modifier) as usize, (100_000.0 * cache_capacity_modifier) as usize,
)), )),
#[allow( #[allow(
clippy::as_conversions, clippy::as_conversions,
@ -517,7 +551,7 @@ impl KeyValueDatabase {
clippy::cast_possible_truncation clippy::cast_possible_truncation
)] )]
statekeyshort_cache: Mutex::new(LruCache::new( statekeyshort_cache: Mutex::new(LruCache::new(
(100_000.0 * config.cache_capacity_modifier) as usize, (100_000.0 * cache_capacity_modifier) as usize,
)), )),
our_real_users_cache: RwLock::new(HashMap::new()), our_real_users_cache: RwLock::new(HashMap::new()),
appservice_in_room_cache: RwLock::new(HashMap::new()), appservice_in_room_cache: RwLock::new(HashMap::new()),

View file

@ -12,10 +12,6 @@ pub(crate) mod rocksdb;
pub(crate) mod watchers; pub(crate) mod watchers;
pub(crate) trait KeyValueDatabaseEngine: Send + Sync { pub(crate) trait KeyValueDatabaseEngine: Send + Sync {
#[cfg(any(feature = "sqlite", feature = "rocksdb"))]
fn open(config: &super::Config) -> Result<Self>
where
Self: Sized;
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>>; fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>>;
fn cleanup(&self) -> Result<()> { fn cleanup(&self) -> Result<()> {
Ok(()) Ok(())

View file

@ -1,6 +1,7 @@
use std::{ use std::{
collections::HashSet, collections::HashSet,
future::Future, future::Future,
path::Path,
pin::Pin, pin::Pin,
sync::{Arc, Mutex, RwLock}, sync::{Arc, Mutex, RwLock},
}; };
@ -13,9 +14,7 @@ use rocksdb::{
}; };
use tracing::Level; use tracing::Level;
use super::{ use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree};
super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree,
};
use crate::{utils, Result}; use crate::{utils, Result};
pub(crate) struct Engine { pub(crate) struct Engine {
@ -70,50 +69,49 @@ fn db_options(max_open_files: i32, rocksdb_cache: &Cache) -> Options {
db_opts db_opts
} }
impl KeyValueDatabaseEngine for Arc<Engine> { impl Engine {
fn open(config: &Config) -> Result<Self> { pub(crate) fn open(
path: &Path,
cache_capacity_mb: f64,
max_open_files: i32,
) -> Result<Self> {
#[allow( #[allow(
clippy::as_conversions, clippy::as_conversions,
clippy::cast_sign_loss, clippy::cast_sign_loss,
clippy::cast_possible_truncation clippy::cast_possible_truncation
)] )]
let cache_capacity_bytes = let cache_capacity_bytes =
(config.database.cache_capacity_mb * 1024.0 * 1024.0) as usize; (cache_capacity_mb * 1024.0 * 1024.0) as usize;
let rocksdb_cache = Cache::new_lru_cache(cache_capacity_bytes); let rocksdb_cache = Cache::new_lru_cache(cache_capacity_bytes);
let db_opts = let db_opts = db_options(max_open_files, &rocksdb_cache);
db_options(config.database.rocksdb_max_open_files, &rocksdb_cache);
let cfs = DBWithThreadMode::<MultiThreaded>::list_cf( let cfs = DBWithThreadMode::<MultiThreaded>::list_cf(&db_opts, path)
&db_opts, .map(|x| x.into_iter().collect::<HashSet<_>>())
&config.database.path, .unwrap_or_default();
)
.map(|x| x.into_iter().collect::<HashSet<_>>())
.unwrap_or_default();
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors( let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_opts, &db_opts,
&config.database.path, path,
cfs.iter().map(|name| { cfs.iter().map(|name| {
ColumnFamilyDescriptor::new( ColumnFamilyDescriptor::new(
name, name,
db_options( db_options(max_open_files, &rocksdb_cache),
config.database.rocksdb_max_open_files,
&rocksdb_cache,
),
) )
}), }),
)?; )?;
Ok(Arc::new(Engine { Ok(Engine {
rocks: db, rocks: db,
max_open_files: config.database.rocksdb_max_open_files, max_open_files,
cache: rocksdb_cache, cache: rocksdb_cache,
old_cfs: cfs, old_cfs: cfs,
new_cfs: Mutex::default(), new_cfs: Mutex::default(),
})) })
} }
}
impl KeyValueDatabaseEngine for Arc<Engine> {
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>> { fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>> {
let mut new_cfs = let mut new_cfs =
self.new_cfs.lock().expect("lock should not be poisoned"); self.new_cfs.lock().expect("lock should not be poisoned");

View file

@ -12,7 +12,7 @@ use thread_local::ThreadLocal;
use tracing::debug; use tracing::debug;
use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree}; use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree};
use crate::{database::Config, Result}; use crate::Result;
thread_local! { thread_local! {
static READ_CONNECTION: RefCell<Option<&'static Connection>> = static READ_CONNECTION: RefCell<Option<&'static Connection>> =
@ -66,6 +66,47 @@ pub(crate) struct Engine {
} }
impl Engine { impl Engine {
pub(crate) fn open(
path: &Path,
cache_capacity_mb: f64,
conduit_compat: bool,
) -> Result<Self> {
let path = path.join(format!(
"{}.db",
if conduit_compat {
"conduit"
} else {
"grapevine"
}
));
// calculates cache-size per permanent connection
// 1. convert MB to KiB
// 2. divide by permanent connections + permanent iter connections +
// write connection
// 3. round down to nearest integer
#[allow(
clippy::as_conversions,
clippy::cast_possible_truncation,
clippy::cast_precision_loss,
clippy::cast_sign_loss
)]
let cache_size_per_thread = ((cache_capacity_mb * 1024.0)
/ ((num_cpus::get() as f64 * 2.0) + 1.0))
as u32;
let writer =
Mutex::new(Engine::prepare_conn(&path, cache_size_per_thread)?);
Ok(Engine {
writer,
read_conn_tls: ThreadLocal::new(),
read_iterator_conn_tls: ThreadLocal::new(),
path,
cache_size_per_thread,
})
}
fn prepare_conn(path: &Path, cache_size_kb: u32) -> Result<Connection> { fn prepare_conn(path: &Path, cache_size_kb: u32) -> Result<Connection> {
let conn = Connection::open(path)?; let conn = Connection::open(path)?;
@ -109,45 +150,6 @@ impl Engine {
} }
impl KeyValueDatabaseEngine for Arc<Engine> { impl KeyValueDatabaseEngine for Arc<Engine> {
fn open(config: &Config) -> Result<Self> {
let path = Path::new(&config.database.path).join(format!(
"{}.db",
if config.conduit_compat {
"conduit"
} else {
"grapevine"
}
));
// calculates cache-size per permanent connection
// 1. convert MB to KiB
// 2. divide by permanent connections + permanent iter connections +
// write connection
// 3. round down to nearest integer
#[allow(
clippy::as_conversions,
clippy::cast_possible_truncation,
clippy::cast_precision_loss,
clippy::cast_sign_loss
)]
let cache_size_per_thread =
((config.database.cache_capacity_mb * 1024.0)
/ ((num_cpus::get() as f64 * 2.0) + 1.0)) as u32;
let writer =
Mutex::new(Engine::prepare_conn(&path, cache_size_per_thread)?);
let arc = Arc::new(Engine {
writer,
read_conn_tls: ThreadLocal::new(),
read_iterator_conn_tls: ThreadLocal::new(),
path,
cache_size_per_thread,
});
Ok(arc)
}
fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> { fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> {
self.write_lock().execute( self.write_lock().execute(
&format!( &format!(

View file

@ -42,6 +42,8 @@ impl fmt::Display for DisplayWithSources<'_> {
pub(crate) enum Main { pub(crate) enum Main {
#[error(transparent)] #[error(transparent)]
ServeCommand(#[from] ServeCommand), ServeCommand(#[from] ServeCommand),
#[error(transparent)]
DbConvert(#[from] DbConvert),
} }
/// Errors returned from the `serve` CLI subcommand. /// Errors returned from the `serve` CLI subcommand.
@ -167,3 +169,6 @@ pub(crate) enum Serve {
)] )]
FederationSelfTestFailed(#[source] crate::Error), FederationSelfTestFailed(#[source] crate::Error),
} }
/// Errors converting between database backends
pub(crate) type DbConvert = Box<dyn std::error::Error>;

View file

@ -299,7 +299,12 @@ impl Service {
if !s.supported_room_versions().contains(&s.config.default_room_version) if !s.supported_room_versions().contains(&s.config.default_room_version)
{ {
error!(config=?s.config.default_room_version, fallback=?crate::config::default_default_room_version(), "Room version in config isn't supported, falling back to default version"); error!(
config=?s.config.default_room_version,
fallback=?crate::config::default_default_room_version(),
"Room version in config isn't supported, falling back to \
default version",
);
s.config.default_room_version = s.config.default_room_version =
crate::config::default_default_room_version(); crate::config::default_default_room_version();
}; };
@ -610,9 +615,7 @@ impl Service {
} }
pub(crate) fn get_media_file(&self, key: &MediaFileKey) -> PathBuf { pub(crate) fn get_media_file(&self, key: &MediaFileKey) -> PathBuf {
let mut r = PathBuf::new(); let mut r = self.get_media_folder();
r.push(self.config.database.path.clone());
r.push("media");
r.push(general_purpose::URL_SAFE_NO_PAD.encode(key.as_bytes())); r.push(general_purpose::URL_SAFE_NO_PAD.encode(key.as_bytes()));
r r
} }