mirror of
https://gitlab.computer.surgery/matrix/grapevine.git
synced 2025-12-17 07:41:23 +01:00
remove unused database backends
This commit is contained in:
parent
aa51acf152
commit
c765a1634d
9 changed files with 2 additions and 646 deletions
57
Cargo.lock
generated
57
Cargo.lock
generated
|
|
@ -399,7 +399,6 @@ dependencies = [
|
||||||
"opentelemetry",
|
"opentelemetry",
|
||||||
"opentelemetry-jaeger",
|
"opentelemetry-jaeger",
|
||||||
"parking_lot",
|
"parking_lot",
|
||||||
"persy",
|
|
||||||
"rand",
|
"rand",
|
||||||
"regex",
|
"regex",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
|
|
@ -472,21 +471,6 @@ dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "crc"
|
|
||||||
version = "3.0.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe"
|
|
||||||
dependencies = [
|
|
||||||
"crc-catalog",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "crc-catalog"
|
|
||||||
version = "2.4.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "crc32fast"
|
name = "crc32fast"
|
||||||
version = "1.4.0"
|
version = "1.4.0"
|
||||||
|
|
@ -722,16 +706,6 @@ dependencies = [
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "fs2"
|
|
||||||
version = "0.4.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
|
|
||||||
dependencies = [
|
|
||||||
"libc",
|
|
||||||
"winapi",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures"
|
name = "futures"
|
||||||
version = "0.3.30"
|
version = "0.3.30"
|
||||||
|
|
@ -1650,22 +1624,6 @@ version = "2.3.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
|
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "persy"
|
|
||||||
version = "1.5.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "9ef4b7250ab3a90ded0e284b2633469c23ef01ea868fe7cbb64e2f0a7d6f6d02"
|
|
||||||
dependencies = [
|
|
||||||
"crc",
|
|
||||||
"data-encoding",
|
|
||||||
"fs2",
|
|
||||||
"linked-hash-map",
|
|
||||||
"rand",
|
|
||||||
"thiserror",
|
|
||||||
"unsigned-varint",
|
|
||||||
"zigzag",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pin-project"
|
name = "pin-project"
|
||||||
version = "1.1.5"
|
version = "1.1.5"
|
||||||
|
|
@ -3090,12 +3048,6 @@ version = "0.2.10"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b"
|
checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "unsigned-varint"
|
|
||||||
version = "0.8.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "untrusted"
|
name = "untrusted"
|
||||||
version = "0.9.0"
|
version = "0.9.0"
|
||||||
|
|
@ -3464,15 +3416,6 @@ version = "1.7.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d"
|
checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "zigzag"
|
|
||||||
version = "0.1.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "70b40401a28d86ce16a330b863b86fd7dbee4d7c940587ab09ab8c019f9e3fdf"
|
|
||||||
dependencies = [
|
|
||||||
"num-traits",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "zstd-sys"
|
name = "zstd-sys"
|
||||||
version = "2.0.9+zstd.1.5.5"
|
version = "2.0.9+zstd.1.5.5"
|
||||||
|
|
|
||||||
|
|
@ -43,10 +43,6 @@ ruma = { git = "https://github.com/ruma/ruma", rev = "5495b85aa311c2805302edb0a7
|
||||||
|
|
||||||
# Async runtime and utilities
|
# Async runtime and utilities
|
||||||
tokio = { version = "1.28.1", features = ["fs", "macros", "signal", "sync"] }
|
tokio = { version = "1.28.1", features = ["fs", "macros", "signal", "sync"] }
|
||||||
# Used for storing data permanently
|
|
||||||
#sled = { version = "0.34.7", features = ["compression", "no_metrics"], optional = true }
|
|
||||||
#sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] }
|
|
||||||
persy = { version = "1.4.4", optional = true, features = ["background_ops"] }
|
|
||||||
|
|
||||||
# Used for the http request / response body type for Ruma endpoints used with reqwest
|
# Used for the http request / response body type for Ruma endpoints used with reqwest
|
||||||
bytes = "1.4.0"
|
bytes = "1.4.0"
|
||||||
|
|
@ -91,7 +87,6 @@ parking_lot = { version = "0.12.1", optional = true }
|
||||||
# crossbeam = { version = "0.8.2", optional = true }
|
# crossbeam = { version = "0.8.2", optional = true }
|
||||||
num_cpus = "1.15.0"
|
num_cpus = "1.15.0"
|
||||||
threadpool = "1.8.1"
|
threadpool = "1.8.1"
|
||||||
# heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c758867e05ad973ef800a6fe1d5d", optional = true }
|
|
||||||
# Used for ruma wrapper
|
# Used for ruma wrapper
|
||||||
serde_html_form = "0.2.0"
|
serde_html_form = "0.2.0"
|
||||||
|
|
||||||
|
|
@ -127,10 +122,7 @@ nix = { version = "0.28", features = ["resource"] }
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["conduit_bin", "backend_sqlite", "backend_rocksdb", "systemd"]
|
default = ["conduit_bin", "backend_sqlite", "backend_rocksdb", "systemd"]
|
||||||
#backend_sled = ["sled"]
|
|
||||||
backend_persy = ["persy", "parking_lot"]
|
|
||||||
backend_sqlite = ["sqlite"]
|
backend_sqlite = ["sqlite"]
|
||||||
#backend_heed = ["heed", "crossbeam"]
|
|
||||||
backend_rocksdb = ["rocksdb"]
|
backend_rocksdb = ["rocksdb"]
|
||||||
jemalloc = ["tikv-jemalloc-ctl", "tikv-jemallocator"]
|
jemalloc = ["tikv-jemalloc-ctl", "tikv-jemallocator"]
|
||||||
sqlite = ["rusqlite", "parking_lot", "tokio/signal"]
|
sqlite = ["rusqlite", "parking_lot", "tokio/signal"]
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,7 @@ ENV SERVER_NAME=localhost
|
||||||
ENV CONDUIT_CONFIG=/workdir/conduit.toml
|
ENV CONDUIT_CONFIG=/workdir/conduit.toml
|
||||||
|
|
||||||
RUN sed -i "s/port = 6167/port = 8008/g" conduit.toml
|
RUN sed -i "s/port = 6167/port = 8008/g" conduit.toml
|
||||||
RUN echo "log = \"warn,_=off,sled=off\"" >> conduit.toml
|
RUN echo "log = \"warn,_=off\"" >> conduit.toml
|
||||||
RUN sed -i "s/address = \"127.0.0.1\"/address = \"0.0.0.0\"/g" conduit.toml
|
RUN sed -i "s/address = \"127.0.0.1\"/address = \"0.0.0.0\"/g" conduit.toml
|
||||||
|
|
||||||
EXPOSE 8008 8448
|
EXPOSE 8008 8448
|
||||||
|
|
|
||||||
|
|
@ -3,27 +3,13 @@ use crate::Result;
|
||||||
|
|
||||||
use std::{future::Future, pin::Pin, sync::Arc};
|
use std::{future::Future, pin::Pin, sync::Arc};
|
||||||
|
|
||||||
#[cfg(feature = "sled")]
|
|
||||||
pub mod sled;
|
|
||||||
|
|
||||||
#[cfg(feature = "sqlite")]
|
#[cfg(feature = "sqlite")]
|
||||||
pub mod sqlite;
|
pub mod sqlite;
|
||||||
|
|
||||||
#[cfg(feature = "heed")]
|
|
||||||
pub mod heed;
|
|
||||||
|
|
||||||
#[cfg(feature = "rocksdb")]
|
#[cfg(feature = "rocksdb")]
|
||||||
pub mod rocksdb;
|
pub mod rocksdb;
|
||||||
|
|
||||||
#[cfg(feature = "persy")]
|
#[cfg(any(feature = "sqlite", feature = "rocksdb",))]
|
||||||
pub mod persy;
|
|
||||||
|
|
||||||
#[cfg(any(
|
|
||||||
feature = "sqlite",
|
|
||||||
feature = "rocksdb",
|
|
||||||
feature = "heed",
|
|
||||||
feature = "persy"
|
|
||||||
))]
|
|
||||||
pub mod watchers;
|
pub mod watchers;
|
||||||
|
|
||||||
pub trait KeyValueDatabaseEngine: Send + Sync {
|
pub trait KeyValueDatabaseEngine: Send + Sync {
|
||||||
|
|
|
||||||
|
|
@ -1,194 +0,0 @@
|
||||||
use super::{super::Config, watchers::Watchers};
|
|
||||||
use crossbeam::channel::{bounded, Sender as ChannelSender};
|
|
||||||
use threadpool::ThreadPool;
|
|
||||||
|
|
||||||
use crate::{Error, Result};
|
|
||||||
use std::{
|
|
||||||
future::Future,
|
|
||||||
pin::Pin,
|
|
||||||
sync::{Arc, Mutex},
|
|
||||||
};
|
|
||||||
|
|
||||||
use super::{DatabaseEngine, Tree};
|
|
||||||
|
|
||||||
type TupleOfBytes = (Vec<u8>, Vec<u8>);
|
|
||||||
|
|
||||||
pub struct Engine {
|
|
||||||
env: heed::Env,
|
|
||||||
iter_pool: Mutex<ThreadPool>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct EngineTree {
|
|
||||||
engine: Arc<Engine>,
|
|
||||||
tree: Arc<heed::UntypedDatabase>,
|
|
||||||
watchers: Watchers,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn convert_error(error: heed::Error) -> Error {
|
|
||||||
Error::HeedError {
|
|
||||||
error: error.to_string(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DatabaseEngine for Engine {
|
|
||||||
fn open(config: &Config) -> Result<Arc<Self>> {
|
|
||||||
let mut env_builder = heed::EnvOpenOptions::new();
|
|
||||||
env_builder.map_size(1024 * 1024 * 1024 * 1024); // 1 Terabyte
|
|
||||||
env_builder.max_readers(126);
|
|
||||||
env_builder.max_dbs(128);
|
|
||||||
unsafe {
|
|
||||||
env_builder.flag(heed::flags::Flags::MdbWriteMap);
|
|
||||||
env_builder.flag(heed::flags::Flags::MdbMapAsync);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Arc::new(Engine {
|
|
||||||
env: env_builder
|
|
||||||
.open(&config.database_path)
|
|
||||||
.map_err(convert_error)?,
|
|
||||||
iter_pool: Mutex::new(ThreadPool::new(10)),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> {
|
|
||||||
// Creates the db if it doesn't exist already
|
|
||||||
Ok(Arc::new(EngineTree {
|
|
||||||
engine: Arc::clone(self),
|
|
||||||
tree: Arc::new(
|
|
||||||
self.env
|
|
||||||
.create_database(Some(name))
|
|
||||||
.map_err(convert_error)?,
|
|
||||||
),
|
|
||||||
watchers: Default::default(),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn flush(self: &Arc<Self>) -> Result<()> {
|
|
||||||
self.env.force_sync().map_err(convert_error)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl EngineTree {
|
|
||||||
fn iter_from_thread(
|
|
||||||
&self,
|
|
||||||
tree: Arc<heed::UntypedDatabase>,
|
|
||||||
from: Vec<u8>,
|
|
||||||
backwards: bool,
|
|
||||||
) -> Box<dyn Iterator<Item = TupleOfBytes> + Send + Sync> {
|
|
||||||
let (s, r) = bounded::<TupleOfBytes>(100);
|
|
||||||
let engine = Arc::clone(&self.engine);
|
|
||||||
|
|
||||||
let lock = self.engine.iter_pool.lock().await;
|
|
||||||
if lock.active_count() < lock.max_count() {
|
|
||||||
lock.execute(move || {
|
|
||||||
iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s);
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
std::thread::spawn(move || {
|
|
||||||
iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Box::new(r.into_iter())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn iter_from_thread_work(
|
|
||||||
tree: Arc<heed::UntypedDatabase>,
|
|
||||||
txn: &heed::RoTxn<'_>,
|
|
||||||
from: Vec<u8>,
|
|
||||||
backwards: bool,
|
|
||||||
s: &ChannelSender<(Vec<u8>, Vec<u8>)>,
|
|
||||||
) {
|
|
||||||
if backwards {
|
|
||||||
for (k, v) in tree.rev_range(txn, ..=&*from).unwrap().map(|r| r.unwrap()) {
|
|
||||||
if s.send((k.to_vec(), v.to_vec())).is_err() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if from.is_empty() {
|
|
||||||
for (k, v) in tree.iter(txn).unwrap().map(|r| r.unwrap()) {
|
|
||||||
if s.send((k.to_vec(), v.to_vec())).is_err() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
for (k, v) in tree.range(txn, &*from..).unwrap().map(|r| r.unwrap()) {
|
|
||||||
if s.send((k.to_vec(), v.to_vec())).is_err() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Tree for EngineTree {
|
|
||||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
|
||||||
let txn = self.engine.env.read_txn().map_err(convert_error)?;
|
|
||||||
Ok(self
|
|
||||||
.tree
|
|
||||||
.get(&txn, &key)
|
|
||||||
.map_err(convert_error)?
|
|
||||||
.map(|s| s.to_vec()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
|
|
||||||
let mut txn = self.engine.env.write_txn().map_err(convert_error)?;
|
|
||||||
self.tree
|
|
||||||
.put(&mut txn, &key, &value)
|
|
||||||
.map_err(convert_error)?;
|
|
||||||
txn.commit().map_err(convert_error)?;
|
|
||||||
self.watchers.wake(key);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove(&self, key: &[u8]) -> Result<()> {
|
|
||||||
let mut txn = self.engine.env.write_txn().map_err(convert_error)?;
|
|
||||||
self.tree.delete(&mut txn, &key).map_err(convert_error)?;
|
|
||||||
txn.commit().map_err(convert_error)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> {
|
|
||||||
self.iter_from(&[], false)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn iter_from(
|
|
||||||
&self,
|
|
||||||
from: &[u8],
|
|
||||||
backwards: bool,
|
|
||||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send> {
|
|
||||||
self.iter_from_thread(Arc::clone(&self.tree), from.to_vec(), backwards)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
|
|
||||||
let mut txn = self.engine.env.write_txn().map_err(convert_error)?;
|
|
||||||
|
|
||||||
let old = self.tree.get(&txn, &key).map_err(convert_error)?;
|
|
||||||
let new =
|
|
||||||
crate::utils::increment(old.as_deref()).expect("utils::increment always returns Some");
|
|
||||||
|
|
||||||
self.tree
|
|
||||||
.put(&mut txn, &key, &&*new)
|
|
||||||
.map_err(convert_error)?;
|
|
||||||
|
|
||||||
txn.commit().map_err(convert_error)?;
|
|
||||||
|
|
||||||
Ok(new)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn scan_prefix<'a>(
|
|
||||||
&'a self,
|
|
||||||
prefix: Vec<u8>,
|
|
||||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> {
|
|
||||||
Box::new(
|
|
||||||
self.iter_from(&prefix, false)
|
|
||||||
.take_while(move |(key, _)| key.starts_with(&prefix)),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
|
|
||||||
self.watchers.watch(prefix)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,197 +0,0 @@
|
||||||
use crate::{
|
|
||||||
database::{
|
|
||||||
abstraction::{watchers::Watchers, KeyValueDatabaseEngine, KvTree},
|
|
||||||
Config,
|
|
||||||
},
|
|
||||||
Result,
|
|
||||||
};
|
|
||||||
use persy::{ByteVec, OpenOptions, Persy, Transaction, TransactionConfig, ValueMode};
|
|
||||||
|
|
||||||
use std::{future::Future, pin::Pin, sync::Arc};
|
|
||||||
|
|
||||||
use tracing::warn;
|
|
||||||
|
|
||||||
pub struct Engine {
|
|
||||||
persy: Persy,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl KeyValueDatabaseEngine for Arc<Engine> {
|
|
||||||
fn open(config: &Config) -> Result<Self> {
|
|
||||||
let mut cfg = persy::Config::new();
|
|
||||||
cfg.change_cache_size((config.db_cache_capacity_mb * 1024.0 * 1024.0) as u64);
|
|
||||||
|
|
||||||
let persy = OpenOptions::new()
|
|
||||||
.create(true)
|
|
||||||
.config(cfg)
|
|
||||||
.open(&format!("{}/db.persy", config.database_path))?;
|
|
||||||
Ok(Arc::new(Engine { persy }))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>> {
|
|
||||||
// Create if it doesn't exist
|
|
||||||
if !self.persy.exists_index(name)? {
|
|
||||||
let mut tx = self.persy.begin()?;
|
|
||||||
tx.create_index::<ByteVec, ByteVec>(name, ValueMode::Replace)?;
|
|
||||||
tx.prepare()?.commit()?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Arc::new(PersyTree {
|
|
||||||
persy: self.persy.clone(),
|
|
||||||
name: name.to_owned(),
|
|
||||||
watchers: Watchers::default(),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn flush(&self) -> Result<()> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct PersyTree {
|
|
||||||
persy: Persy,
|
|
||||||
name: String,
|
|
||||||
watchers: Watchers,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PersyTree {
|
|
||||||
fn begin(&self) -> Result<Transaction> {
|
|
||||||
Ok(self
|
|
||||||
.persy
|
|
||||||
.begin_with(TransactionConfig::new().set_background_sync(true))?)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl KvTree for PersyTree {
|
|
||||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
|
||||||
let result = self
|
|
||||||
.persy
|
|
||||||
.get::<ByteVec, ByteVec>(&self.name, &ByteVec::from(key))?
|
|
||||||
.next()
|
|
||||||
.map(|v| (*v).to_owned());
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
|
|
||||||
self.insert_batch(&mut Some((key.to_owned(), value.to_owned())).into_iter())?;
|
|
||||||
self.watchers.wake(key);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> {
|
|
||||||
let mut tx = self.begin()?;
|
|
||||||
for (key, value) in iter {
|
|
||||||
tx.put::<ByteVec, ByteVec>(
|
|
||||||
&self.name,
|
|
||||||
ByteVec::from(key.clone()),
|
|
||||||
ByteVec::from(value),
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
tx.prepare()?.commit()?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
|
|
||||||
let mut tx = self.begin()?;
|
|
||||||
for key in iter {
|
|
||||||
let old = tx
|
|
||||||
.get::<ByteVec, ByteVec>(&self.name, &ByteVec::from(key.clone()))?
|
|
||||||
.next()
|
|
||||||
.map(|v| (*v).to_owned());
|
|
||||||
let new = crate::utils::increment(old.as_deref()).unwrap();
|
|
||||||
tx.put::<ByteVec, ByteVec>(&self.name, ByteVec::from(key), ByteVec::from(new))?;
|
|
||||||
}
|
|
||||||
tx.prepare()?.commit()?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove(&self, key: &[u8]) -> Result<()> {
|
|
||||||
let mut tx = self.begin()?;
|
|
||||||
tx.remove::<ByteVec, ByteVec>(&self.name, ByteVec::from(key), None)?;
|
|
||||||
tx.prepare()?.commit()?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
|
||||||
let iter = self.persy.range::<ByteVec, ByteVec, _>(&self.name, ..);
|
|
||||||
match iter {
|
|
||||||
Ok(iter) => Box::new(iter.filter_map(|(k, v)| {
|
|
||||||
v.into_iter()
|
|
||||||
.map(|val| ((*k).to_owned(), (*val).to_owned()))
|
|
||||||
.next()
|
|
||||||
})),
|
|
||||||
Err(e) => {
|
|
||||||
warn!("error iterating {:?}", e);
|
|
||||||
Box::new(std::iter::empty())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn iter_from<'a>(
|
|
||||||
&'a self,
|
|
||||||
from: &[u8],
|
|
||||||
backwards: bool,
|
|
||||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
|
||||||
let range = if backwards {
|
|
||||||
self.persy
|
|
||||||
.range::<ByteVec, ByteVec, _>(&self.name, ..=ByteVec::from(from))
|
|
||||||
} else {
|
|
||||||
self.persy
|
|
||||||
.range::<ByteVec, ByteVec, _>(&self.name, ByteVec::from(from)..)
|
|
||||||
};
|
|
||||||
match range {
|
|
||||||
Ok(iter) => {
|
|
||||||
let map = iter.filter_map(|(k, v)| {
|
|
||||||
v.into_iter()
|
|
||||||
.map(|val| ((*k).to_owned(), (*val).to_owned()))
|
|
||||||
.next()
|
|
||||||
});
|
|
||||||
if backwards {
|
|
||||||
Box::new(map.rev())
|
|
||||||
} else {
|
|
||||||
Box::new(map)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("error iterating with prefix {:?}", e);
|
|
||||||
Box::new(std::iter::empty())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
|
|
||||||
self.increment_batch(&mut Some(key.to_owned()).into_iter())?;
|
|
||||||
Ok(self.get(key)?.unwrap())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn scan_prefix<'a>(
|
|
||||||
&'a self,
|
|
||||||
prefix: Vec<u8>,
|
|
||||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
|
||||||
let range_prefix = ByteVec::from(prefix.clone());
|
|
||||||
let range = self
|
|
||||||
.persy
|
|
||||||
.range::<ByteVec, ByteVec, _>(&self.name, range_prefix..);
|
|
||||||
|
|
||||||
match range {
|
|
||||||
Ok(iter) => {
|
|
||||||
let owned_prefix = prefix.clone();
|
|
||||||
Box::new(
|
|
||||||
iter.take_while(move |(k, _)| (*k).starts_with(&owned_prefix))
|
|
||||||
.filter_map(|(k, v)| {
|
|
||||||
v.into_iter()
|
|
||||||
.map(|val| ((*k).to_owned(), (*val).to_owned()))
|
|
||||||
.next()
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("error scanning prefix {:?}", e);
|
|
||||||
Box::new(std::iter::empty())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
|
|
||||||
self.watchers.watch(prefix)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,127 +0,0 @@
|
||||||
use super::super::Config;
|
|
||||||
use crate::{utils, Result};
|
|
||||||
use std::{future::Future, pin::Pin, sync::Arc};
|
|
||||||
use tracing::warn;
|
|
||||||
|
|
||||||
use super::{DatabaseEngine, Tree};
|
|
||||||
|
|
||||||
pub struct Engine(sled::Db);
|
|
||||||
|
|
||||||
pub struct SledEngineTree(sled::Tree);
|
|
||||||
|
|
||||||
impl DatabaseEngine for Engine {
|
|
||||||
fn open(config: &Config) -> Result<Arc<Self>> {
|
|
||||||
Ok(Arc::new(Engine(
|
|
||||||
sled::Config::default()
|
|
||||||
.path(&config.database_path)
|
|
||||||
.cache_capacity((config.db_cache_capacity_mb * 1024.0 * 1024.0) as u64)
|
|
||||||
.use_compression(true)
|
|
||||||
.open()?,
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> {
|
|
||||||
Ok(Arc::new(SledEngineTree(self.0.open_tree(name)?)))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn flush(self: &Arc<Self>) -> Result<()> {
|
|
||||||
Ok(()) // noop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Tree for SledEngineTree {
|
|
||||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
|
||||||
Ok(self.0.get(key)?.map(|v| v.to_vec()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
|
|
||||||
self.0.insert(key, value)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> {
|
|
||||||
for (key, value) in iter {
|
|
||||||
self.0.insert(key, value)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove(&self, key: &[u8]) -> Result<()> {
|
|
||||||
self.0.remove(key)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
|
||||||
Box::new(
|
|
||||||
self.0
|
|
||||||
.iter()
|
|
||||||
.filter_map(|r| {
|
|
||||||
if let Err(e) = &r {
|
|
||||||
warn!("Error: {}", e);
|
|
||||||
}
|
|
||||||
r.ok()
|
|
||||||
})
|
|
||||||
.map(|(k, v)| (k.to_vec().into(), v.to_vec().into())),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn iter_from(
|
|
||||||
&self,
|
|
||||||
from: &[u8],
|
|
||||||
backwards: bool,
|
|
||||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)>> {
|
|
||||||
let iter = if backwards {
|
|
||||||
self.0.range(..=from)
|
|
||||||
} else {
|
|
||||||
self.0.range(from..)
|
|
||||||
};
|
|
||||||
|
|
||||||
let iter = iter
|
|
||||||
.filter_map(|r| {
|
|
||||||
if let Err(e) = &r {
|
|
||||||
warn!("Error: {}", e);
|
|
||||||
}
|
|
||||||
r.ok()
|
|
||||||
})
|
|
||||||
.map(|(k, v)| (k.to_vec().into(), v.to_vec().into()));
|
|
||||||
|
|
||||||
if backwards {
|
|
||||||
Box::new(iter.rev())
|
|
||||||
} else {
|
|
||||||
Box::new(iter)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
|
|
||||||
Ok(self
|
|
||||||
.0
|
|
||||||
.update_and_fetch(key, utils::increment)
|
|
||||||
.map(|o| o.expect("increment always sets a value").to_vec())?)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn scan_prefix<'a>(
|
|
||||||
&'a self,
|
|
||||||
prefix: Vec<u8>,
|
|
||||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
|
||||||
let iter = self
|
|
||||||
.0
|
|
||||||
.scan_prefix(prefix)
|
|
||||||
.filter_map(|r| {
|
|
||||||
if let Err(e) = &r {
|
|
||||||
warn!("Error: {}", e);
|
|
||||||
}
|
|
||||||
r.ok()
|
|
||||||
})
|
|
||||||
.map(|(k, v)| (k.to_vec().into(), v.to_vec().into()));
|
|
||||||
|
|
||||||
Box::new(iter)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
|
|
||||||
let prefix = prefix.to_vec();
|
|
||||||
Box::pin(async move {
|
|
||||||
self.0.watch_prefix(prefix).await;
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -172,16 +172,11 @@ impl KeyValueDatabase {
|
||||||
fn check_db_setup(config: &Config) -> Result<()> {
|
fn check_db_setup(config: &Config) -> Result<()> {
|
||||||
let path = Path::new(&config.database_path);
|
let path = Path::new(&config.database_path);
|
||||||
|
|
||||||
let sled_exists = path.join("db").exists();
|
|
||||||
let sqlite_exists = path.join("conduit.db").exists();
|
let sqlite_exists = path.join("conduit.db").exists();
|
||||||
let rocksdb_exists = path.join("IDENTITY").exists();
|
let rocksdb_exists = path.join("IDENTITY").exists();
|
||||||
|
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
|
|
||||||
if sled_exists {
|
|
||||||
count += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if sqlite_exists {
|
if sqlite_exists {
|
||||||
count += 1;
|
count += 1;
|
||||||
}
|
}
|
||||||
|
|
@ -195,12 +190,6 @@ impl KeyValueDatabase {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
if sled_exists && config.database_backend != "sled" {
|
|
||||||
return Err(Error::bad_config(
|
|
||||||
"Found sled at database_path, but is not specified in config.",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
if sqlite_exists && config.database_backend != "sqlite" {
|
if sqlite_exists && config.database_backend != "sqlite" {
|
||||||
return Err(Error::bad_config(
|
return Err(Error::bad_config(
|
||||||
"Found sqlite at database_path, but is not specified in config.",
|
"Found sqlite at database_path, but is not specified in config.",
|
||||||
|
|
@ -238,12 +227,6 @@ impl KeyValueDatabase {
|
||||||
#[cfg(feature = "rocksdb")]
|
#[cfg(feature = "rocksdb")]
|
||||||
Arc::new(Arc::<abstraction::rocksdb::Engine>::open(&config)?)
|
Arc::new(Arc::<abstraction::rocksdb::Engine>::open(&config)?)
|
||||||
}
|
}
|
||||||
"persy" => {
|
|
||||||
#[cfg(not(feature = "persy"))]
|
|
||||||
return Err(Error::BadConfig("Database backend not found."));
|
|
||||||
#[cfg(feature = "persy")]
|
|
||||||
Arc::new(Arc::<abstraction::persy::Engine>::open(&config)?)
|
|
||||||
}
|
|
||||||
_ => {
|
_ => {
|
||||||
return Err(Error::BadConfig("Database backend not found."));
|
return Err(Error::BadConfig("Database backend not found."));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,33 +11,18 @@ use ruma::{
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tracing::{error, info};
|
use tracing::{error, info};
|
||||||
|
|
||||||
#[cfg(feature = "persy")]
|
|
||||||
use persy::PersyError;
|
|
||||||
|
|
||||||
use crate::RumaResponse;
|
use crate::RumaResponse;
|
||||||
|
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[cfg(feature = "sled")]
|
|
||||||
#[error("There was a problem with the connection to the sled database.")]
|
|
||||||
SledError {
|
|
||||||
#[from]
|
|
||||||
source: sled::Error,
|
|
||||||
},
|
|
||||||
#[cfg(feature = "sqlite")]
|
#[cfg(feature = "sqlite")]
|
||||||
#[error("There was a problem with the connection to the sqlite database: {source}")]
|
#[error("There was a problem with the connection to the sqlite database: {source}")]
|
||||||
SqliteError {
|
SqliteError {
|
||||||
#[from]
|
#[from]
|
||||||
source: rusqlite::Error,
|
source: rusqlite::Error,
|
||||||
},
|
},
|
||||||
#[cfg(feature = "persy")]
|
|
||||||
#[error("There was a problem with the connection to the persy database.")]
|
|
||||||
PersyError { source: PersyError },
|
|
||||||
#[cfg(feature = "heed")]
|
|
||||||
#[error("There was a problem with the connection to the heed database: {error}")]
|
|
||||||
HeedError { error: String },
|
|
||||||
#[cfg(feature = "rocksdb")]
|
#[cfg(feature = "rocksdb")]
|
||||||
#[error("There was a problem with the connection to the rocksdb database: {source}")]
|
#[error("There was a problem with the connection to the rocksdb database: {source}")]
|
||||||
RocksDbError {
|
RocksDbError {
|
||||||
|
|
@ -157,14 +142,8 @@ impl Error {
|
||||||
let db_error = String::from("Database or I/O error occurred.");
|
let db_error = String::from("Database or I/O error occurred.");
|
||||||
|
|
||||||
match self {
|
match self {
|
||||||
#[cfg(feature = "sled")]
|
|
||||||
Self::SledError { .. } => db_error,
|
|
||||||
#[cfg(feature = "sqlite")]
|
#[cfg(feature = "sqlite")]
|
||||||
Self::SqliteError { .. } => db_error,
|
Self::SqliteError { .. } => db_error,
|
||||||
#[cfg(feature = "persy")]
|
|
||||||
Self::PersyError { .. } => db_error,
|
|
||||||
#[cfg(feature = "heed")]
|
|
||||||
Self::HeedError => db_error,
|
|
||||||
#[cfg(feature = "rocksdb")]
|
#[cfg(feature = "rocksdb")]
|
||||||
Self::RocksDbError { .. } => db_error,
|
Self::RocksDbError { .. } => db_error,
|
||||||
Self::IoError { .. } => db_error,
|
Self::IoError { .. } => db_error,
|
||||||
|
|
@ -175,15 +154,6 @@ impl Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "persy")]
|
|
||||||
impl<T: Into<PersyError>> From<persy::PE<T>> for Error {
|
|
||||||
fn from(err: persy::PE<T>) -> Self {
|
|
||||||
Error::PersyError {
|
|
||||||
source: err.error().into(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<Infallible> for Error {
|
impl From<Infallible> for Error {
|
||||||
fn from(i: Infallible) -> Self {
|
fn from(i: Infallible) -> Self {
|
||||||
match i {}
|
match i {}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue