mirror of
https://gitlab.computer.surgery/matrix/grapevine.git
synced 2025-12-16 23:31:24 +01:00
make database config a sum type
This way we don't need to construct the entire configuration to load a database or database engine. The other advantage is that it allows having options that are unique to each database backend. The one thing I don't like about this is `DatabaseConfig::path`, whose existence implies all databases will have a file path, which is not true for out-of-process databases. The only thing this is really used for is creating the media directory. I think we should restructure the configuration in the future to resolve this.
This commit is contained in:
parent
f5ba3e3062
commit
153e3e4c93
6 changed files with 88 additions and 83 deletions
|
|
@ -76,7 +76,7 @@ pub(crate) async fn run(args: ServeArgs) -> Result<(), error::ServeCommand> {
|
||||||
|
|
||||||
info!("Loading database");
|
info!("Loading database");
|
||||||
let db = Box::leak(Box::new(
|
let db = Box::leak(Box::new(
|
||||||
KeyValueDatabase::load_or_create(&config)
|
KeyValueDatabase::load_or_create(&config.database)
|
||||||
.map_err(Error::DatabaseError)?,
|
.map_err(Error::DatabaseError)?,
|
||||||
));
|
));
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -216,35 +216,53 @@ impl Default for TurnConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, Deserialize)]
|
#[cfg(feature = "rocksdb")]
|
||||||
#[serde(rename_all = "lowercase")]
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
pub(crate) enum DatabaseBackend {
|
pub(crate) struct RocksdbConfig {
|
||||||
#[cfg(feature = "rocksdb")]
|
pub(crate) path: PathBuf,
|
||||||
Rocksdb,
|
#[serde(default = "default_rocksdb_max_open_files")]
|
||||||
#[cfg(feature = "sqlite")]
|
pub(crate) max_open_files: i32,
|
||||||
Sqlite,
|
#[serde(default = "default_rocksdb_cache_capacity_bytes")]
|
||||||
|
pub(crate) cache_capacity_bytes: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Display for DatabaseBackend {
|
#[cfg(feature = "sqlite")]
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
match *self {
|
pub(crate) struct SqliteConfig {
|
||||||
|
pub(crate) path: PathBuf,
|
||||||
|
#[serde(default = "default_sqlite_cache_capacity_kilobytes")]
|
||||||
|
pub(crate) cache_capacity_kilobytes: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
#[serde(rename_all = "lowercase", tag = "backend")]
|
||||||
|
pub(crate) enum DatabaseConfig {
|
||||||
|
#[cfg(feature = "rocksdb")]
|
||||||
|
Rocksdb(RocksdbConfig),
|
||||||
|
#[cfg(feature = "sqlite")]
|
||||||
|
Sqlite(SqliteConfig),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DatabaseConfig {
|
||||||
|
pub(crate) fn path(&self) -> &Path {
|
||||||
|
match self {
|
||||||
#[cfg(feature = "rocksdb")]
|
#[cfg(feature = "rocksdb")]
|
||||||
DatabaseBackend::Rocksdb => write!(f, "RocksDB"),
|
DatabaseConfig::Rocksdb(x) => &x.path,
|
||||||
#[cfg(feature = "sqlite")]
|
#[cfg(feature = "sqlite")]
|
||||||
DatabaseBackend::Sqlite => write!(f, "SQLite"),
|
DatabaseConfig::Sqlite(x) => &x.path,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize)]
|
impl Display for DatabaseConfig {
|
||||||
pub(crate) struct DatabaseConfig {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
pub(crate) backend: DatabaseBackend,
|
match *self {
|
||||||
pub(crate) path: String,
|
#[cfg(feature = "rocksdb")]
|
||||||
#[serde(default = "default_db_cache_capacity_mb")]
|
DatabaseConfig::Rocksdb(_) => write!(f, "RocksDB"),
|
||||||
pub(crate) cache_capacity_mb: f64,
|
#[cfg(feature = "sqlite")]
|
||||||
#[cfg(feature = "rocksdb")]
|
DatabaseConfig::Sqlite(_) => write!(f, "SQLite"),
|
||||||
#[serde(default = "default_rocksdb_max_open_files")]
|
}
|
||||||
pub(crate) rocksdb_max_open_files: i32,
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Default, Deserialize)]
|
#[derive(Clone, Debug, Default, Deserialize)]
|
||||||
|
|
@ -377,10 +395,6 @@ fn default_port() -> u16 {
|
||||||
6167
|
6167
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_db_cache_capacity_mb() -> f64 {
|
|
||||||
300.0
|
|
||||||
}
|
|
||||||
|
|
||||||
fn default_cache_capacity_modifier() -> f64 {
|
fn default_cache_capacity_modifier() -> f64 {
|
||||||
1.0
|
1.0
|
||||||
}
|
}
|
||||||
|
|
@ -390,6 +404,16 @@ fn default_rocksdb_max_open_files() -> i32 {
|
||||||
1000
|
1000
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "rocksdb")]
|
||||||
|
fn default_rocksdb_cache_capacity_bytes() -> usize {
|
||||||
|
300 * 1024 * 1024
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "sqlite")]
|
||||||
|
fn default_sqlite_cache_capacity_kilobytes() -> u32 {
|
||||||
|
300 * 1024
|
||||||
|
}
|
||||||
|
|
||||||
fn default_pdu_cache_capacity() -> usize {
|
fn default_pdu_cache_capacity() -> usize {
|
||||||
150_000
|
150_000
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ use ruma::{
|
||||||
use tracing::{debug, error, info, info_span, warn, Instrument};
|
use tracing::{debug, error, info, info_span, warn, Instrument};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::DatabaseBackend,
|
config::DatabaseConfig,
|
||||||
service::{
|
service::{
|
||||||
media::MediaFileKey,
|
media::MediaFileKey,
|
||||||
rooms::{
|
rooms::{
|
||||||
|
|
@ -22,7 +22,7 @@ use crate::{
|
||||||
state_compressor::CompressedStateEvent,
|
state_compressor::CompressedStateEvent,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
services, utils, Config, Error, Result,
|
services, utils, Error, Result,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub(crate) mod abstraction;
|
pub(crate) mod abstraction;
|
||||||
|
|
@ -233,20 +233,20 @@ pub(crate) struct KeyValueDatabase {
|
||||||
|
|
||||||
impl KeyValueDatabase {
|
impl KeyValueDatabase {
|
||||||
pub(crate) fn load_or_create_engine(
|
pub(crate) fn load_or_create_engine(
|
||||||
config: &Config,
|
config: &DatabaseConfig,
|
||||||
) -> Result<Arc<dyn KeyValueDatabaseEngine>> {
|
) -> Result<Arc<dyn KeyValueDatabaseEngine>> {
|
||||||
#[cfg(not(any(feature = "rocksdb", feature = "sqlite")))]
|
#[cfg(not(any(feature = "rocksdb", feature = "sqlite")))]
|
||||||
return Err(Error::BadConfig(
|
return Err(Error::BadConfig(
|
||||||
"Compiled without support for any databases",
|
"Compiled without support for any databases",
|
||||||
));
|
));
|
||||||
|
|
||||||
let x: Arc<dyn KeyValueDatabaseEngine> = match config.database.backend {
|
let x: Arc<dyn KeyValueDatabaseEngine> = match config {
|
||||||
#[cfg(feature = "sqlite")]
|
#[cfg(feature = "sqlite")]
|
||||||
DatabaseBackend::Sqlite => {
|
DatabaseConfig::Sqlite(config) => {
|
||||||
Arc::new(Arc::new(abstraction::sqlite::Engine::open(config)?))
|
Arc::new(Arc::new(abstraction::sqlite::Engine::open(config)?))
|
||||||
}
|
}
|
||||||
#[cfg(feature = "rocksdb")]
|
#[cfg(feature = "rocksdb")]
|
||||||
DatabaseBackend::Rocksdb => {
|
DatabaseConfig::Rocksdb(config) => {
|
||||||
Arc::new(Arc::new(abstraction::rocksdb::Engine::open(config)?))
|
Arc::new(Arc::new(abstraction::rocksdb::Engine::open(config)?))
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -260,7 +260,9 @@ impl KeyValueDatabase {
|
||||||
allow(unreachable_code)
|
allow(unreachable_code)
|
||||||
)]
|
)]
|
||||||
#[allow(clippy::too_many_lines)]
|
#[allow(clippy::too_many_lines)]
|
||||||
pub(crate) fn load_or_create(config: &Config) -> Result<KeyValueDatabase> {
|
pub(crate) fn load_or_create(
|
||||||
|
config: &DatabaseConfig,
|
||||||
|
) -> Result<KeyValueDatabase> {
|
||||||
let builder = Self::load_or_create_engine(config)?;
|
let builder = Self::load_or_create_engine(config)?;
|
||||||
|
|
||||||
let db = Self {
|
let db = Self {
|
||||||
|
|
@ -943,7 +945,7 @@ impl KeyValueDatabase {
|
||||||
);
|
);
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
backend = %services().globals.config.database.backend,
|
backend = %services().globals.config.database,
|
||||||
version = latest_database_version,
|
version = latest_database_version,
|
||||||
"Loaded database",
|
"Loaded database",
|
||||||
);
|
);
|
||||||
|
|
@ -956,7 +958,7 @@ impl KeyValueDatabase {
|
||||||
services().admin.create_admin_room().await?;
|
services().admin.create_admin_room().await?;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
backend = %services().globals.config.database.backend,
|
backend = %services().globals.config.database,
|
||||||
version = latest_database_version,
|
version = latest_database_version,
|
||||||
"Created new database",
|
"Created new database",
|
||||||
);
|
);
|
||||||
|
|
|
||||||
|
|
@ -12,10 +12,8 @@ use rocksdb::{
|
||||||
};
|
};
|
||||||
use tracing::Level;
|
use tracing::Level;
|
||||||
|
|
||||||
use super::{
|
use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree};
|
||||||
super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree,
|
use crate::{config::RocksdbConfig, utils, Result};
|
||||||
};
|
|
||||||
use crate::{utils, Result};
|
|
||||||
|
|
||||||
pub(crate) struct Engine {
|
pub(crate) struct Engine {
|
||||||
rocks: DBWithThreadMode<MultiThreaded>,
|
rocks: DBWithThreadMode<MultiThreaded>,
|
||||||
|
|
@ -70,43 +68,35 @@ fn db_options(max_open_files: i32, rocksdb_cache: &Cache) -> Options {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Engine {
|
impl Engine {
|
||||||
pub(crate) fn open(config: &Config) -> Result<Self> {
|
pub(crate) fn open(config: &RocksdbConfig) -> Result<Self> {
|
||||||
#[allow(
|
#[allow(
|
||||||
clippy::as_conversions,
|
clippy::as_conversions,
|
||||||
clippy::cast_sign_loss,
|
clippy::cast_sign_loss,
|
||||||
clippy::cast_possible_truncation
|
clippy::cast_possible_truncation
|
||||||
)]
|
)]
|
||||||
let cache_capacity_bytes =
|
let rocksdb_cache = Cache::new_lru_cache(config.cache_capacity_bytes);
|
||||||
(config.database.cache_capacity_mb * 1024.0 * 1024.0) as usize;
|
|
||||||
let rocksdb_cache = Cache::new_lru_cache(cache_capacity_bytes);
|
|
||||||
|
|
||||||
let db_opts =
|
let db_opts = db_options(config.max_open_files, &rocksdb_cache);
|
||||||
db_options(config.database.rocksdb_max_open_files, &rocksdb_cache);
|
|
||||||
|
|
||||||
let cfs = DBWithThreadMode::<MultiThreaded>::list_cf(
|
let cfs =
|
||||||
&db_opts,
|
DBWithThreadMode::<MultiThreaded>::list_cf(&db_opts, &config.path)
|
||||||
&config.database.path,
|
.map(|x| x.into_iter().collect::<HashSet<_>>())
|
||||||
)
|
.unwrap_or_default();
|
||||||
.map(|x| x.into_iter().collect::<HashSet<_>>())
|
|
||||||
.unwrap_or_default();
|
|
||||||
|
|
||||||
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
|
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
|
||||||
&db_opts,
|
&db_opts,
|
||||||
&config.database.path,
|
&config.path,
|
||||||
cfs.iter().map(|name| {
|
cfs.iter().map(|name| {
|
||||||
ColumnFamilyDescriptor::new(
|
ColumnFamilyDescriptor::new(
|
||||||
name,
|
name,
|
||||||
db_options(
|
db_options(config.max_open_files, &rocksdb_cache),
|
||||||
config.database.rocksdb_max_open_files,
|
|
||||||
&rocksdb_cache,
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
}),
|
}),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
Ok(Engine {
|
Ok(Engine {
|
||||||
rocks: db,
|
rocks: db,
|
||||||
max_open_files: config.database.rocksdb_max_open_files,
|
max_open_files: config.max_open_files,
|
||||||
cache: rocksdb_cache,
|
cache: rocksdb_cache,
|
||||||
old_cfs: cfs,
|
old_cfs: cfs,
|
||||||
new_cfs: Mutex::default(),
|
new_cfs: Mutex::default(),
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ use thread_local::ThreadLocal;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree};
|
use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree};
|
||||||
use crate::{database::Config, Result};
|
use crate::{config::SqliteConfig, Result};
|
||||||
|
|
||||||
thread_local! {
|
thread_local! {
|
||||||
static READ_CONNECTION: RefCell<Option<&'static Connection>> =
|
static READ_CONNECTION: RefCell<Option<&'static Connection>> =
|
||||||
|
|
@ -63,39 +63,26 @@ pub(crate) struct Engine {
|
||||||
read_iterator_conn_tls: ThreadLocal<Connection>,
|
read_iterator_conn_tls: ThreadLocal<Connection>,
|
||||||
|
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
cache_size_per_thread: u32,
|
cache_capacity_kilobytes: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Engine {
|
impl Engine {
|
||||||
pub(crate) fn open(config: &Config) -> Result<Self> {
|
pub(crate) fn open(config: &SqliteConfig) -> Result<Self> {
|
||||||
fs::create_dir_all(&config.database.path)?;
|
fs::create_dir_all(&config.path)?;
|
||||||
|
|
||||||
let path = Path::new(&config.database.path).join("sqlite.db");
|
let path = config.path.join("sqlite.db");
|
||||||
|
|
||||||
// calculates cache-size per permanent connection
|
let writer = Mutex::new(Engine::prepare_conn(
|
||||||
// 1. convert MB to KiB
|
&path,
|
||||||
// 2. divide by permanent connections + permanent iter connections +
|
config.cache_capacity_kilobytes,
|
||||||
// write connection
|
)?);
|
||||||
// 3. round down to nearest integer
|
|
||||||
#[allow(
|
|
||||||
clippy::as_conversions,
|
|
||||||
clippy::cast_possible_truncation,
|
|
||||||
clippy::cast_precision_loss,
|
|
||||||
clippy::cast_sign_loss
|
|
||||||
)]
|
|
||||||
let cache_size_per_thread =
|
|
||||||
((config.database.cache_capacity_mb * 1024.0)
|
|
||||||
/ ((num_cpus::get() as f64 * 2.0) + 1.0)) as u32;
|
|
||||||
|
|
||||||
let writer =
|
|
||||||
Mutex::new(Engine::prepare_conn(&path, cache_size_per_thread)?);
|
|
||||||
|
|
||||||
Ok(Engine {
|
Ok(Engine {
|
||||||
writer,
|
writer,
|
||||||
read_conn_tls: ThreadLocal::new(),
|
read_conn_tls: ThreadLocal::new(),
|
||||||
read_iterator_conn_tls: ThreadLocal::new(),
|
read_iterator_conn_tls: ThreadLocal::new(),
|
||||||
path,
|
path,
|
||||||
cache_size_per_thread,
|
cache_capacity_kilobytes: config.cache_capacity_kilobytes,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -121,13 +108,15 @@ impl Engine {
|
||||||
|
|
||||||
fn read_lock(&self) -> &Connection {
|
fn read_lock(&self) -> &Connection {
|
||||||
self.read_conn_tls.get_or(|| {
|
self.read_conn_tls.get_or(|| {
|
||||||
Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap()
|
Self::prepare_conn(&self.path, self.cache_capacity_kilobytes)
|
||||||
|
.unwrap()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_lock_iterator(&self) -> &Connection {
|
fn read_lock_iterator(&self) -> &Connection {
|
||||||
self.read_iterator_conn_tls.get_or(|| {
|
self.read_iterator_conn_tls.get_or(|| {
|
||||||
Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap()
|
Self::prepare_conn(&self.path, self.cache_capacity_kilobytes)
|
||||||
|
.unwrap()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -609,7 +609,7 @@ impl Service {
|
||||||
|
|
||||||
pub(crate) fn get_media_folder(&self) -> PathBuf {
|
pub(crate) fn get_media_folder(&self) -> PathBuf {
|
||||||
let mut r = PathBuf::new();
|
let mut r = PathBuf::new();
|
||||||
r.push(self.config.database.path.clone());
|
r.push(self.config.database.path());
|
||||||
r.push("media");
|
r.push("media");
|
||||||
r
|
r
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue