Merge branch 'charles/db-refactorings' into 'main'

Various DB refactorings

See merge request matrix/grapevine!117
This commit is contained in:
Charles Hall 2024-10-27 11:21:19 +00:00
commit b391614d90
7 changed files with 128 additions and 189 deletions

View file

@ -91,7 +91,7 @@ pub(crate) async fn run(args: ServeArgs) -> Result<(), error::ServeCommand> {
info!("Loading database"); info!("Loading database");
let db = Box::leak(Box::new( let db = Box::leak(Box::new(
KeyValueDatabase::load_or_create(&config) KeyValueDatabase::load_or_create(&config.database)
.map_err(Error::DatabaseError)?, .map_err(Error::DatabaseError)?,
)); ));

View file

@ -243,35 +243,53 @@ impl Default for TurnConfig {
} }
} }
#[derive(Clone, Copy, Debug, Deserialize)] #[cfg(feature = "rocksdb")]
#[serde(rename_all = "lowercase")] #[derive(Clone, Debug, Deserialize)]
pub(crate) enum DatabaseBackend { pub(crate) struct RocksdbConfig {
#[cfg(feature = "rocksdb")] pub(crate) path: PathBuf,
Rocksdb, #[serde(default = "default_rocksdb_max_open_files")]
#[cfg(feature = "sqlite")] pub(crate) max_open_files: i32,
Sqlite, #[serde(default = "default_rocksdb_cache_capacity_bytes")]
pub(crate) cache_capacity_bytes: usize,
} }
impl Display for DatabaseBackend { #[cfg(feature = "sqlite")]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { #[derive(Clone, Debug, Deserialize)]
match *self { pub(crate) struct SqliteConfig {
pub(crate) path: PathBuf,
#[serde(default = "default_sqlite_cache_capacity_kilobytes")]
pub(crate) cache_capacity_kilobytes: u32,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "lowercase", tag = "backend")]
pub(crate) enum DatabaseConfig {
#[cfg(feature = "rocksdb")]
Rocksdb(RocksdbConfig),
#[cfg(feature = "sqlite")]
Sqlite(SqliteConfig),
}
impl DatabaseConfig {
pub(crate) fn path(&self) -> &Path {
match self {
#[cfg(feature = "rocksdb")] #[cfg(feature = "rocksdb")]
DatabaseBackend::Rocksdb => write!(f, "RocksDB"), DatabaseConfig::Rocksdb(x) => &x.path,
#[cfg(feature = "sqlite")] #[cfg(feature = "sqlite")]
DatabaseBackend::Sqlite => write!(f, "SQLite"), DatabaseConfig::Sqlite(x) => &x.path,
} }
} }
} }
#[derive(Clone, Debug, Deserialize)] impl Display for DatabaseConfig {
pub(crate) struct DatabaseConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
pub(crate) backend: DatabaseBackend, match *self {
pub(crate) path: String, #[cfg(feature = "rocksdb")]
#[serde(default = "default_db_cache_capacity_mb")] DatabaseConfig::Rocksdb(_) => write!(f, "RocksDB"),
pub(crate) cache_capacity_mb: f64, #[cfg(feature = "sqlite")]
#[cfg(feature = "rocksdb")] DatabaseConfig::Sqlite(_) => write!(f, "SQLite"),
#[serde(default = "default_rocksdb_max_open_files")] }
pub(crate) rocksdb_max_open_files: i32, }
} }
#[derive(Clone, Debug, Default, Deserialize)] #[derive(Clone, Debug, Default, Deserialize)]
@ -405,10 +423,6 @@ fn default_port() -> u16 {
6167 6167
} }
fn default_db_cache_capacity_mb() -> f64 {
300.0
}
fn default_cache_capacity_modifier() -> f64 { fn default_cache_capacity_modifier() -> f64 {
1.0 1.0
} }
@ -418,6 +432,16 @@ fn default_rocksdb_max_open_files() -> i32 {
1000 1000
} }
#[cfg(feature = "rocksdb")]
fn default_rocksdb_cache_capacity_bytes() -> usize {
300 * 1024 * 1024
}
#[cfg(feature = "sqlite")]
fn default_sqlite_cache_capacity_kilobytes() -> u32 {
300 * 1024
}
fn default_pdu_cache_capacity() -> usize { fn default_pdu_cache_capacity() -> usize {
150_000 150_000
} }

View file

@ -3,7 +3,6 @@ use std::{
fs, fs,
io::Write, io::Write,
mem::size_of, mem::size_of,
path::Path,
sync::Arc, sync::Arc,
}; };
@ -15,7 +14,7 @@ use ruma::{
use tracing::{debug, error, info, info_span, warn, Instrument}; use tracing::{debug, error, info, info_span, warn, Instrument};
use crate::{ use crate::{
config::DatabaseBackend, config::DatabaseConfig,
service::{ service::{
media::MediaFileKey, media::MediaFileKey,
rooms::{ rooms::{
@ -23,7 +22,7 @@ use crate::{
state_compressor::CompressedStateEvent, state_compressor::CompressedStateEvent,
}, },
}, },
services, utils, Config, Error, Result, services, utils, Error, Result,
}; };
pub(crate) mod abstraction; pub(crate) mod abstraction;
@ -233,96 +232,38 @@ pub(crate) struct KeyValueDatabase {
} }
impl KeyValueDatabase { impl KeyValueDatabase {
fn check_db_setup(config: &Config) -> Result<()> { pub(crate) fn load_or_create_engine(
let path = Path::new(&config.database.path); config: &DatabaseConfig,
) -> Result<Arc<dyn KeyValueDatabaseEngine>> {
#[cfg(not(any(feature = "rocksdb", feature = "sqlite")))]
return Err(Error::BadConfig(
"Compiled without support for any databases",
));
let sqlite_exists = path let x: Arc<dyn KeyValueDatabaseEngine> = match config {
.join(format!( #[cfg(feature = "sqlite")]
"{}.db", DatabaseConfig::Sqlite(config) => {
if config.conduit_compat { Arc::new(Arc::new(abstraction::sqlite::Engine::open(config)?))
"conduit" }
} else { #[cfg(feature = "rocksdb")]
"grapevine" DatabaseConfig::Rocksdb(config) => {
} Arc::new(Arc::new(abstraction::rocksdb::Engine::open(config)?))
)) }
.exists(); };
let rocksdb_exists = path.join("IDENTITY").exists();
let mut count = 0; #[cfg(any(feature = "rocksdb", feature = "sqlite"))]
return Ok(x);
if sqlite_exists {
count += 1;
}
if rocksdb_exists {
count += 1;
}
if count > 1 {
warn!("Multiple databases at database_path detected");
return Ok(());
}
let (backend_is_rocksdb, backend_is_sqlite): (bool, bool) =
match config.database.backend {
#[cfg(feature = "rocksdb")]
DatabaseBackend::Rocksdb => (true, false),
#[cfg(feature = "sqlite")]
DatabaseBackend::Sqlite => (false, true),
};
if sqlite_exists && !backend_is_sqlite {
return Err(Error::bad_config(
"Found sqlite at database_path, but is not specified in \
config.",
));
}
if rocksdb_exists && !backend_is_rocksdb {
return Err(Error::bad_config(
"Found rocksdb at database_path, but is not specified in \
config.",
));
}
Ok(())
} }
/// Load an existing database or create a new one, and initialize all
/// services with the loaded database.
#[cfg_attr( #[cfg_attr(
not(any(feature = "rocksdb", feature = "sqlite")), not(any(feature = "rocksdb", feature = "sqlite")),
allow(unreachable_code) allow(unreachable_code)
)] )]
#[allow(clippy::too_many_lines)] #[allow(clippy::too_many_lines)]
pub(crate) fn load_or_create(config: &Config) -> Result<KeyValueDatabase> { pub(crate) fn load_or_create(
Self::check_db_setup(config)?; config: &DatabaseConfig,
) -> Result<KeyValueDatabase> {
if !Path::new(&config.database.path).exists() { let builder = Self::load_or_create_engine(config)?;
fs::create_dir_all(&config.database.path).map_err(|_| {
Error::BadConfig(
"Database folder doesn't exists and couldn't be created \
(e.g. due to missing permissions). Please create the \
database folder yourself.",
)
})?;
}
#[cfg_attr(
not(any(feature = "rocksdb", feature = "sqlite")),
allow(unused_variables)
)]
let builder: Arc<dyn KeyValueDatabaseEngine> =
match config.database.backend {
#[cfg(feature = "sqlite")]
DatabaseBackend::Sqlite => {
Arc::new(Arc::<abstraction::sqlite::Engine>::open(config)?)
}
#[cfg(feature = "rocksdb")]
DatabaseBackend::Rocksdb => {
Arc::new(Arc::<abstraction::rocksdb::Engine>::open(config)?)
}
};
let db = Self { let db = Self {
db: builder.clone(), db: builder.clone(),
@ -1004,7 +945,7 @@ impl KeyValueDatabase {
); );
info!( info!(
backend = %services().globals.config.database.backend, backend = %services().globals.config.database,
version = latest_database_version, version = latest_database_version,
"Loaded database", "Loaded database",
); );
@ -1017,7 +958,7 @@ impl KeyValueDatabase {
services().admin.create_admin_room().await?; services().admin.create_admin_room().await?;
info!( info!(
backend = %services().globals.config.database.backend, backend = %services().globals.config.database,
version = latest_database_version, version = latest_database_version,
"Created new database", "Created new database",
); );

View file

@ -12,10 +12,6 @@ pub(crate) mod rocksdb;
pub(crate) mod watchers; pub(crate) mod watchers;
pub(crate) trait KeyValueDatabaseEngine: Send + Sync { pub(crate) trait KeyValueDatabaseEngine: Send + Sync {
#[cfg(any(feature = "sqlite", feature = "rocksdb"))]
fn open(config: &super::Config) -> Result<Self>
where
Self: Sized;
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>>; fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>>;
fn cleanup(&self) -> Result<()> { fn cleanup(&self) -> Result<()> {
Ok(()) Ok(())

View file

@ -12,10 +12,8 @@ use rocksdb::{
}; };
use tracing::Level; use tracing::Level;
use super::{ use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree};
super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree, use crate::{config::RocksdbConfig, utils, Result};
};
use crate::{utils, Result};
pub(crate) struct Engine { pub(crate) struct Engine {
rocks: DBWithThreadMode<MultiThreaded>, rocks: DBWithThreadMode<MultiThreaded>,
@ -69,50 +67,44 @@ fn db_options(max_open_files: i32, rocksdb_cache: &Cache) -> Options {
db_opts db_opts
} }
impl KeyValueDatabaseEngine for Arc<Engine> { impl Engine {
fn open(config: &Config) -> Result<Self> { pub(crate) fn open(config: &RocksdbConfig) -> Result<Self> {
#[allow( #[allow(
clippy::as_conversions, clippy::as_conversions,
clippy::cast_sign_loss, clippy::cast_sign_loss,
clippy::cast_possible_truncation clippy::cast_possible_truncation
)] )]
let cache_capacity_bytes = let rocksdb_cache = Cache::new_lru_cache(config.cache_capacity_bytes);
(config.database.cache_capacity_mb * 1024.0 * 1024.0) as usize;
let rocksdb_cache = Cache::new_lru_cache(cache_capacity_bytes);
let db_opts = let db_opts = db_options(config.max_open_files, &rocksdb_cache);
db_options(config.database.rocksdb_max_open_files, &rocksdb_cache);
let cfs = DBWithThreadMode::<MultiThreaded>::list_cf( let cfs =
&db_opts, DBWithThreadMode::<MultiThreaded>::list_cf(&db_opts, &config.path)
&config.database.path, .map(|x| x.into_iter().collect::<HashSet<_>>())
) .unwrap_or_default();
.map(|x| x.into_iter().collect::<HashSet<_>>())
.unwrap_or_default();
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors( let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_opts, &db_opts,
&config.database.path, &config.path,
cfs.iter().map(|name| { cfs.iter().map(|name| {
ColumnFamilyDescriptor::new( ColumnFamilyDescriptor::new(
name, name,
db_options( db_options(config.max_open_files, &rocksdb_cache),
config.database.rocksdb_max_open_files,
&rocksdb_cache,
),
) )
}), }),
)?; )?;
Ok(Arc::new(Engine { Ok(Engine {
rocks: db, rocks: db,
max_open_files: config.database.rocksdb_max_open_files, max_open_files: config.max_open_files,
cache: rocksdb_cache, cache: rocksdb_cache,
old_cfs: cfs, old_cfs: cfs,
new_cfs: Mutex::default(), new_cfs: Mutex::default(),
})) })
} }
}
impl KeyValueDatabaseEngine for Arc<Engine> {
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>> { fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>> {
let mut new_cfs = let mut new_cfs =
self.new_cfs.lock().expect("lock should not be poisoned"); self.new_cfs.lock().expect("lock should not be poisoned");

View file

@ -1,5 +1,6 @@
use std::{ use std::{
cell::RefCell, cell::RefCell,
fs,
future::Future, future::Future,
path::{Path, PathBuf}, path::{Path, PathBuf},
pin::Pin, pin::Pin,
@ -12,7 +13,7 @@ use thread_local::ThreadLocal;
use tracing::debug; use tracing::debug;
use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree}; use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree};
use crate::{database::Config, Result}; use crate::{config::SqliteConfig, Result};
thread_local! { thread_local! {
static READ_CONNECTION: RefCell<Option<&'static Connection>> = static READ_CONNECTION: RefCell<Option<&'static Connection>> =
@ -62,10 +63,29 @@ pub(crate) struct Engine {
read_iterator_conn_tls: ThreadLocal<Connection>, read_iterator_conn_tls: ThreadLocal<Connection>,
path: PathBuf, path: PathBuf,
cache_size_per_thread: u32, cache_capacity_kilobytes: u32,
} }
impl Engine { impl Engine {
pub(crate) fn open(config: &SqliteConfig) -> Result<Self> {
fs::create_dir_all(&config.path)?;
let path = config.path.join("sqlite.db");
let writer = Mutex::new(Engine::prepare_conn(
&path,
config.cache_capacity_kilobytes,
)?);
Ok(Engine {
writer,
read_conn_tls: ThreadLocal::new(),
read_iterator_conn_tls: ThreadLocal::new(),
path,
cache_capacity_kilobytes: config.cache_capacity_kilobytes,
})
}
fn prepare_conn(path: &Path, cache_size_kb: u32) -> Result<Connection> { fn prepare_conn(path: &Path, cache_size_kb: u32) -> Result<Connection> {
let conn = Connection::open(path)?; let conn = Connection::open(path)?;
@ -88,13 +108,15 @@ impl Engine {
fn read_lock(&self) -> &Connection { fn read_lock(&self) -> &Connection {
self.read_conn_tls.get_or(|| { self.read_conn_tls.get_or(|| {
Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap() Self::prepare_conn(&self.path, self.cache_capacity_kilobytes)
.unwrap()
}) })
} }
fn read_lock_iterator(&self) -> &Connection { fn read_lock_iterator(&self) -> &Connection {
self.read_iterator_conn_tls.get_or(|| { self.read_iterator_conn_tls.get_or(|| {
Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap() Self::prepare_conn(&self.path, self.cache_capacity_kilobytes)
.unwrap()
}) })
} }
@ -109,45 +131,6 @@ impl Engine {
} }
impl KeyValueDatabaseEngine for Arc<Engine> { impl KeyValueDatabaseEngine for Arc<Engine> {
fn open(config: &Config) -> Result<Self> {
let path = Path::new(&config.database.path).join(format!(
"{}.db",
if config.conduit_compat {
"conduit"
} else {
"grapevine"
}
));
// calculates cache-size per permanent connection
// 1. convert MB to KiB
// 2. divide by permanent connections + permanent iter connections +
// write connection
// 3. round down to nearest integer
#[allow(
clippy::as_conversions,
clippy::cast_possible_truncation,
clippy::cast_precision_loss,
clippy::cast_sign_loss
)]
let cache_size_per_thread =
((config.database.cache_capacity_mb * 1024.0)
/ ((num_cpus::get() as f64 * 2.0) + 1.0)) as u32;
let writer =
Mutex::new(Engine::prepare_conn(&path, cache_size_per_thread)?);
let arc = Arc::new(Engine {
writer,
read_conn_tls: ThreadLocal::new(),
read_iterator_conn_tls: ThreadLocal::new(),
path,
cache_size_per_thread,
});
Ok(arc)
}
fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> { fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> {
self.write_lock().execute( self.write_lock().execute(
&format!( &format!(

View file

@ -299,7 +299,12 @@ impl Service {
if !s.supported_room_versions().contains(&s.config.default_room_version) if !s.supported_room_versions().contains(&s.config.default_room_version)
{ {
error!(config=?s.config.default_room_version, fallback=?crate::config::default_default_room_version(), "Room version in config isn't supported, falling back to default version"); error!(
config=?s.config.default_room_version,
fallback=?crate::config::default_default_room_version(),
"Room version in config isn't supported, falling back to \
default version",
);
s.config.default_room_version = s.config.default_room_version =
crate::config::default_default_room_version(); crate::config::default_default_room_version();
}; };
@ -604,15 +609,13 @@ impl Service {
pub(crate) fn get_media_folder(&self) -> PathBuf { pub(crate) fn get_media_folder(&self) -> PathBuf {
let mut r = PathBuf::new(); let mut r = PathBuf::new();
r.push(self.config.database.path.clone()); r.push(self.config.database.path());
r.push("media"); r.push("media");
r r
} }
pub(crate) fn get_media_file(&self, key: &MediaFileKey) -> PathBuf { pub(crate) fn get_media_file(&self, key: &MediaFileKey) -> PathBuf {
let mut r = PathBuf::new(); let mut r = self.get_media_folder();
r.push(self.config.database.path.clone());
r.push("media");
r.push(general_purpose::URL_SAFE_NO_PAD.encode(key.as_bytes())); r.push(general_purpose::URL_SAFE_NO_PAD.encode(key.as_bytes()));
r r
} }