mirror of
https://gitlab.computer.surgery/matrix/grapevine.git
synced 2025-12-16 15:21:24 +01:00
remove database's dependency on entire Config
This makes it easier to ad-hoc construct databases, e.g. in a CLI tool that operates on multiple database instances.
This commit is contained in:
parent
14b0769a3e
commit
b3d9cd5e9c
5 changed files with 89 additions and 54 deletions
|
|
@ -1,6 +1,6 @@
|
|||
use std::{
|
||||
collections::HashSet, future::Future, net::SocketAddr, sync::atomic,
|
||||
time::Duration,
|
||||
collections::HashSet, future::Future, net::SocketAddr, path::Path,
|
||||
sync::atomic, time::Duration,
|
||||
};
|
||||
|
||||
use axum::{
|
||||
|
|
@ -74,8 +74,16 @@ pub(crate) async fn run(args: ServeArgs) -> Result<(), error::ServeCommand> {
|
|||
|
||||
info!("Loading database");
|
||||
let db = Box::leak(Box::new(
|
||||
KeyValueDatabase::load_or_create(&config)
|
||||
.map_err(Error::DatabaseError)?,
|
||||
KeyValueDatabase::load_or_create(
|
||||
config.database.backend,
|
||||
Path::new(&config.database.path),
|
||||
config.database.cache_capacity_mb,
|
||||
config.database.rocksdb_max_open_files,
|
||||
config.pdu_cache_capacity,
|
||||
config.cache_capacity_modifier,
|
||||
config.conduit_compat,
|
||||
)
|
||||
.map_err(Error::DatabaseError)?,
|
||||
));
|
||||
|
||||
Services::build(db, config, reload_handles)
|
||||
|
|
|
|||
|
|
@ -242,7 +242,7 @@ pub(crate) struct DatabaseConfig {
|
|||
pub(crate) path: String,
|
||||
#[serde(default = "default_db_cache_capacity_mb")]
|
||||
pub(crate) cache_capacity_mb: f64,
|
||||
#[cfg(feature = "rocksdb")]
|
||||
#[cfg_attr(not(feature = "rocksdb"), allow(dead_code))]
|
||||
#[serde(default = "default_rocksdb_max_open_files")]
|
||||
pub(crate) rocksdb_max_open_files: i32,
|
||||
}
|
||||
|
|
@ -383,7 +383,7 @@ fn default_cache_capacity_modifier() -> f64 {
|
|||
1.0
|
||||
}
|
||||
|
||||
#[cfg(feature = "rocksdb")]
|
||||
#[cfg_attr(not(feature = "rocksdb"), allow(dead_code))]
|
||||
fn default_rocksdb_max_open_files() -> i32 {
|
||||
1000
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ use crate::{
|
|||
timeline::PduCount,
|
||||
},
|
||||
},
|
||||
services, utils, Config, Error, PduEvent, Result,
|
||||
services, utils, Error, PduEvent, Result,
|
||||
};
|
||||
|
||||
pub(crate) struct KeyValueDatabase {
|
||||
|
|
@ -256,13 +256,15 @@ pub(crate) struct KeyValueDatabase {
|
|||
}
|
||||
|
||||
impl KeyValueDatabase {
|
||||
fn check_db_setup(config: &Config) -> Result<()> {
|
||||
let path = Path::new(&config.database.path);
|
||||
|
||||
fn check_db_setup(
|
||||
backend: DatabaseBackend,
|
||||
path: &Path,
|
||||
conduit_compat: bool,
|
||||
) -> Result<()> {
|
||||
let sqlite_exists = path
|
||||
.join(format!(
|
||||
"{}.db",
|
||||
if config.conduit_compat {
|
||||
if conduit_compat {
|
||||
"conduit"
|
||||
} else {
|
||||
"grapevine"
|
||||
|
|
@ -287,7 +289,7 @@ impl KeyValueDatabase {
|
|||
}
|
||||
|
||||
let (backend_is_rocksdb, backend_is_sqlite): (bool, bool) =
|
||||
match config.database.backend {
|
||||
match backend {
|
||||
#[cfg(feature = "rocksdb")]
|
||||
DatabaseBackend::Rocksdb => (true, false),
|
||||
#[cfg(feature = "sqlite")]
|
||||
|
|
@ -312,17 +314,21 @@ impl KeyValueDatabase {
|
|||
}
|
||||
|
||||
pub(crate) fn load_or_create_engine(
|
||||
config: &Config,
|
||||
backend: DatabaseBackend,
|
||||
path: &Path,
|
||||
cache_capacity_mb: f64,
|
||||
rocksdb_max_open_files: i32,
|
||||
conduit_compat: bool,
|
||||
) -> Result<Arc<dyn KeyValueDatabaseEngine>> {
|
||||
#[cfg(not(any(feature = "rocksdb", feature = "sqlite")))]
|
||||
return Err(Error::BadConfig(
|
||||
"Compiled without support for any databases",
|
||||
));
|
||||
|
||||
Self::check_db_setup(config)?;
|
||||
Self::check_db_setup(backend, path, conduit_compat)?;
|
||||
|
||||
if !Path::new(&config.database.path).exists() {
|
||||
fs::create_dir_all(&config.database.path).map_err(|_| {
|
||||
if !Path::new(path).exists() {
|
||||
fs::create_dir_all(path).map_err(|_| {
|
||||
Error::BadConfig(
|
||||
"Database folder doesn't exists and couldn't be created \
|
||||
(e.g. due to missing permissions). Please create the \
|
||||
|
|
@ -331,14 +337,22 @@ impl KeyValueDatabase {
|
|||
})?;
|
||||
}
|
||||
|
||||
let x: Arc<dyn KeyValueDatabaseEngine> = match config.database.backend {
|
||||
let x: Arc<dyn KeyValueDatabaseEngine> = match backend {
|
||||
#[cfg(feature = "sqlite")]
|
||||
DatabaseBackend::Sqlite => {
|
||||
Arc::new(Arc::new(abstraction::sqlite::Engine::open(config)?))
|
||||
Arc::new(Arc::new(abstraction::sqlite::Engine::open(
|
||||
path,
|
||||
cache_capacity_mb,
|
||||
conduit_compat,
|
||||
)?))
|
||||
}
|
||||
#[cfg(feature = "rocksdb")]
|
||||
DatabaseBackend::Rocksdb => {
|
||||
Arc::new(Arc::new(abstraction::rocksdb::Engine::open(config)?))
|
||||
Arc::new(Arc::new(abstraction::rocksdb::Engine::open(
|
||||
path,
|
||||
cache_capacity_mb,
|
||||
rocksdb_max_open_files,
|
||||
)?))
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -351,8 +365,22 @@ impl KeyValueDatabase {
|
|||
allow(unreachable_code)
|
||||
)]
|
||||
#[allow(clippy::too_many_lines)]
|
||||
pub(crate) fn load_or_create(config: &Config) -> Result<KeyValueDatabase> {
|
||||
let builder = Self::load_or_create_engine(config)?;
|
||||
pub(crate) fn load_or_create(
|
||||
backend: DatabaseBackend,
|
||||
path: &Path,
|
||||
cache_capacity_mb: f64,
|
||||
rocksdb_max_open_files: i32,
|
||||
pdu_cache_capacity: u32,
|
||||
cache_capacity_modifier: f64,
|
||||
conduit_compat: bool,
|
||||
) -> Result<KeyValueDatabase> {
|
||||
let builder = Self::load_or_create_engine(
|
||||
backend,
|
||||
path,
|
||||
cache_capacity_mb,
|
||||
rocksdb_max_open_files,
|
||||
conduit_compat,
|
||||
)?;
|
||||
|
||||
let db = Self {
|
||||
db: builder.clone(),
|
||||
|
|
@ -480,8 +508,7 @@ impl KeyValueDatabase {
|
|||
server_signingkeys: builder.open_tree("server_signingkeys")?,
|
||||
|
||||
pdu_cache: Mutex::new(LruCache::new(
|
||||
config
|
||||
.pdu_cache_capacity
|
||||
pdu_cache_capacity
|
||||
.try_into()
|
||||
.expect("pdu cache capacity fits into usize"),
|
||||
)),
|
||||
|
|
@ -491,7 +518,7 @@ impl KeyValueDatabase {
|
|||
clippy::cast_possible_truncation
|
||||
)]
|
||||
auth_chain_cache: Mutex::new(LruCache::new(
|
||||
(100_000.0 * config.cache_capacity_modifier) as usize,
|
||||
(100_000.0 * cache_capacity_modifier) as usize,
|
||||
)),
|
||||
#[allow(
|
||||
clippy::as_conversions,
|
||||
|
|
@ -499,7 +526,7 @@ impl KeyValueDatabase {
|
|||
clippy::cast_possible_truncation
|
||||
)]
|
||||
shorteventid_cache: Mutex::new(LruCache::new(
|
||||
(100_000.0 * config.cache_capacity_modifier) as usize,
|
||||
(100_000.0 * cache_capacity_modifier) as usize,
|
||||
)),
|
||||
#[allow(
|
||||
clippy::as_conversions,
|
||||
|
|
@ -507,7 +534,7 @@ impl KeyValueDatabase {
|
|||
clippy::cast_possible_truncation
|
||||
)]
|
||||
eventidshort_cache: Mutex::new(LruCache::new(
|
||||
(100_000.0 * config.cache_capacity_modifier) as usize,
|
||||
(100_000.0 * cache_capacity_modifier) as usize,
|
||||
)),
|
||||
#[allow(
|
||||
clippy::as_conversions,
|
||||
|
|
@ -515,7 +542,7 @@ impl KeyValueDatabase {
|
|||
clippy::cast_possible_truncation
|
||||
)]
|
||||
shortstatekey_cache: Mutex::new(LruCache::new(
|
||||
(100_000.0 * config.cache_capacity_modifier) as usize,
|
||||
(100_000.0 * cache_capacity_modifier) as usize,
|
||||
)),
|
||||
#[allow(
|
||||
clippy::as_conversions,
|
||||
|
|
@ -523,7 +550,7 @@ impl KeyValueDatabase {
|
|||
clippy::cast_possible_truncation
|
||||
)]
|
||||
statekeyshort_cache: Mutex::new(LruCache::new(
|
||||
(100_000.0 * config.cache_capacity_modifier) as usize,
|
||||
(100_000.0 * cache_capacity_modifier) as usize,
|
||||
)),
|
||||
our_real_users_cache: RwLock::new(HashMap::new()),
|
||||
appservice_in_room_cache: RwLock::new(HashMap::new()),
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
use std::{
|
||||
collections::HashSet,
|
||||
future::Future,
|
||||
path::Path,
|
||||
pin::Pin,
|
||||
sync::{Arc, Mutex, RwLock},
|
||||
};
|
||||
|
|
@ -13,9 +14,7 @@ use rocksdb::{
|
|||
};
|
||||
use tracing::Level;
|
||||
|
||||
use super::{
|
||||
super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree,
|
||||
};
|
||||
use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree};
|
||||
use crate::{utils, Result};
|
||||
|
||||
pub(crate) struct Engine {
|
||||
|
|
@ -71,43 +70,40 @@ fn db_options(max_open_files: i32, rocksdb_cache: &Cache) -> Options {
|
|||
}
|
||||
|
||||
impl Engine {
|
||||
pub(crate) fn open(config: &Config) -> Result<Self> {
|
||||
pub(crate) fn open(
|
||||
path: &Path,
|
||||
cache_capacity_mb: f64,
|
||||
max_open_files: i32,
|
||||
) -> Result<Self> {
|
||||
#[allow(
|
||||
clippy::as_conversions,
|
||||
clippy::cast_sign_loss,
|
||||
clippy::cast_possible_truncation
|
||||
)]
|
||||
let cache_capacity_bytes =
|
||||
(config.database.cache_capacity_mb * 1024.0 * 1024.0) as usize;
|
||||
(cache_capacity_mb * 1024.0 * 1024.0) as usize;
|
||||
let rocksdb_cache = Cache::new_lru_cache(cache_capacity_bytes);
|
||||
|
||||
let db_opts =
|
||||
db_options(config.database.rocksdb_max_open_files, &rocksdb_cache);
|
||||
let db_opts = db_options(max_open_files, &rocksdb_cache);
|
||||
|
||||
let cfs = DBWithThreadMode::<MultiThreaded>::list_cf(
|
||||
&db_opts,
|
||||
&config.database.path,
|
||||
)
|
||||
.map(|x| x.into_iter().collect::<HashSet<_>>())
|
||||
.unwrap_or_default();
|
||||
let cfs = DBWithThreadMode::<MultiThreaded>::list_cf(&db_opts, path)
|
||||
.map(|x| x.into_iter().collect::<HashSet<_>>())
|
||||
.unwrap_or_default();
|
||||
|
||||
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
|
||||
&db_opts,
|
||||
&config.database.path,
|
||||
path,
|
||||
cfs.iter().map(|name| {
|
||||
ColumnFamilyDescriptor::new(
|
||||
name,
|
||||
db_options(
|
||||
config.database.rocksdb_max_open_files,
|
||||
&rocksdb_cache,
|
||||
),
|
||||
db_options(max_open_files, &rocksdb_cache),
|
||||
)
|
||||
}),
|
||||
)?;
|
||||
|
||||
Ok(Engine {
|
||||
rocks: db,
|
||||
max_open_files: config.database.rocksdb_max_open_files,
|
||||
max_open_files,
|
||||
cache: rocksdb_cache,
|
||||
old_cfs: cfs,
|
||||
new_cfs: Mutex::default(),
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ use thread_local::ThreadLocal;
|
|||
use tracing::debug;
|
||||
|
||||
use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree};
|
||||
use crate::{database::Config, Result};
|
||||
use crate::Result;
|
||||
|
||||
thread_local! {
|
||||
static READ_CONNECTION: RefCell<Option<&'static Connection>> =
|
||||
|
|
@ -66,10 +66,14 @@ pub(crate) struct Engine {
|
|||
}
|
||||
|
||||
impl Engine {
|
||||
pub(crate) fn open(config: &Config) -> Result<Self> {
|
||||
let path = Path::new(&config.database.path).join(format!(
|
||||
pub(crate) fn open(
|
||||
path: &Path,
|
||||
cache_capacity_mb: f64,
|
||||
conduit_compat: bool,
|
||||
) -> Result<Self> {
|
||||
let path = path.join(format!(
|
||||
"{}.db",
|
||||
if config.conduit_compat {
|
||||
if conduit_compat {
|
||||
"conduit"
|
||||
} else {
|
||||
"grapevine"
|
||||
|
|
@ -87,9 +91,9 @@ impl Engine {
|
|||
clippy::cast_precision_loss,
|
||||
clippy::cast_sign_loss
|
||||
)]
|
||||
let cache_size_per_thread =
|
||||
((config.database.cache_capacity_mb * 1024.0)
|
||||
/ ((num_cpus::get() as f64 * 2.0) + 1.0)) as u32;
|
||||
let cache_size_per_thread = ((cache_capacity_mb * 1024.0)
|
||||
/ ((num_cpus::get() as f64 * 2.0) + 1.0))
|
||||
as u32;
|
||||
|
||||
let writer =
|
||||
Mutex::new(Engine::prepare_conn(&path, cache_size_per_thread)?);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue