Add integrity check command

This commit is contained in:
Lambda 2024-09-22 10:00:59 +00:00
parent d1370f9834
commit d93d16d7a3
6 changed files with 235 additions and 2 deletions

9
Cargo.lock generated
View file

@ -966,6 +966,7 @@ dependencies = [
"thread_local",
"tikv-jemallocator",
"tokio",
"tokio-util",
"toml",
"tower 0.5.2",
"tower-http",
@ -1002,6 +1003,12 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]]
name = "hashbrown"
version = "0.15.2"
@ -3420,6 +3427,8 @@ dependencies = [
"bytes",
"futures-core",
"futures-sink",
"futures-util",
"hashbrown 0.14.5",
"pin-project-lite",
"tokio",
]

View file

@ -135,6 +135,7 @@ thiserror = "2.0.12"
thread_local = "1.1.8"
tikv-jemallocator = { version = "0.6.0", features = ["unprefixed_malloc_on_supported_platforms"], optional = true }
tokio = { version = "1.44.1", features = ["fs", "macros", "signal", "sync"] }
tokio-util = { version = "0.7.12", features = ["rt"] }
toml = "0.8.20"
tower = { version = "0.5.2", features = ["util"] }
tower-http = { version = "0.6.2", features = ["add-extension", "cors", "sensitive-headers", "trace", "util"] }

151
src/integrity.rs Normal file
View file

@ -0,0 +1,151 @@
use thiserror::Error;
use crate::{
database::{abstraction::KvTree, KeyValueDatabase},
utils::error::Result,
};
pub(crate) trait CheckIntegrity: Sync {
fn check_integrity(
&'static self,
) -> Box<dyn Iterator<Item = Result<IntegrityError>>>;
}
#[derive(Debug, Error)]
pub(crate) enum IntegrityError {
#[error(transparent)]
Symmetry(#[from] SymmetryError),
#[error("key {x_key:?} in {x_name} is not a key in {y_name}")]
BadForeignKey {
x_name: &'static str,
x_key: Vec<u8>,
y_name: &'static str,
},
}
#[derive(Debug, Error)]
pub(crate) enum SymmetryError {
#[error(
"missing key {y_key:?} in {y_name} (referenced by key {x_key:?} in \
{x_name})"
)]
MissingKey {
x_name: &'static str,
x_key: Vec<u8>,
y_name: &'static str,
y_key: Vec<u8>,
},
#[error(
"key {x_key:?} in {x_name} points to {y_key:?} in {y_name}, but that \
points to {y_value:?}"
)]
Incoherent {
x_name: &'static str,
x_key: Vec<u8>,
y_name: &'static str,
y_key: Vec<u8>,
y_value: Vec<u8>,
},
}
#[derive(Clone, Copy)]
struct NamedTree<'db> {
name: &'static str,
tree: &'db dyn KvTree,
}
macro_rules! tree {
($db:expr, $tree:ident) => {
NamedTree {
name: stringify!($tree),
tree: &*$db.$tree,
}
};
}
fn check_symmetric<'db>(
x: NamedTree<'db>,
y: NamedTree<'db>,
) -> impl Iterator<Item = Result<IntegrityError>> + 'db {
x.tree.iter().filter_map(move |(x_key, y_key)| match y.tree.get(&y_key) {
Err(e) => Some(Err(e)),
Ok(Some(y_value)) if y_value == x_key => None,
Ok(Some(y_value)) => Some(Ok(SymmetryError::Incoherent {
x_name: x.name,
x_key,
y_name: y.name,
y_key,
y_value,
}
.into())),
Ok(None) => Some(Ok(SymmetryError::MissingKey {
x_name: x.name,
x_key,
y_name: y.name,
y_key,
}
.into())),
})
}
fn check_symmetric_both<'db>(
x: NamedTree<'db>,
y: NamedTree<'db>,
) -> impl Iterator<Item = Result<IntegrityError>> + 'db {
std::iter::empty().chain(check_symmetric(x, y)).chain(check_symmetric(y, x))
}
fn check_foreign_key<'db>(
primary: NamedTree<'db>,
subset: NamedTree<'db>,
) -> impl Iterator<Item = Result<IntegrityError>> + 'db {
subset.tree.iter().filter_map(move |(x_key, _)| {
match primary.tree.get(&x_key) {
Err(e) => Some(Err(e)),
Ok(Some(_)) => None,
Ok(None) => Some(Ok(IntegrityError::BadForeignKey {
x_name: subset.name,
x_key,
y_name: primary.name,
})),
}
})
}
fn short_ids(
database: &'static KeyValueDatabase,
) -> impl Iterator<Item = Result<IntegrityError>> {
let eventid_symmetry = check_symmetric_both(
tree!(database, shorteventid_eventid),
tree!(database, eventid_shorteventid),
);
let eventid_foreign = [
tree!(database, shorteventid_shortstatehash),
tree!(database, shorteventid_authchain),
]
.into_iter()
.flat_map(|subset| {
check_foreign_key(tree!(database, shorteventid_eventid), subset)
});
let statekey_symmetry = check_symmetric_both(
tree!(database, shortstatekey_statekey),
tree!(database, statekey_shortstatekey),
);
eventid_symmetry.chain(eventid_foreign).chain(statekey_symmetry)
}
impl CheckIntegrity for KeyValueDatabase {
fn check_integrity(
&'static self,
) -> Box<dyn Iterator<Item = Result<IntegrityError>>> {
Box::new(short_ids(self))
}
}

View file

@ -14,6 +14,7 @@ mod cli;
mod config;
mod database;
mod error;
mod integrity;
mod observability;
mod service;
mod utils;

View file

@ -52,6 +52,7 @@ impl Services {
+ key_backups::Data
+ media::Data
+ sending::Data
+ crate::integrity::CheckIntegrity
+ 'static,
>(
db: &'static D,
@ -115,7 +116,7 @@ impl Services {
uiaa: uiaa::Service::new(db),
users: users::Service::new(db),
account_data: db,
admin: admin::Service::new(),
admin: admin::Service::new(db),
key_backups: db,
media: media::Service {
db,

View file

@ -28,11 +28,14 @@ use ruma::{
};
use serde_json::value::to_raw_value;
use tokio::sync::{mpsc, Mutex, RwLock};
use tokio_util::task::AbortOnDropHandle;
use tracing::warn;
use super::pdu::PduBuilder;
use crate::{
api::client_server::{leave_all_rooms, AUTO_GEN_PASSWORD_LENGTH},
error::DisplayWithSources,
integrity::CheckIntegrity,
services,
utils::{self, dbg_truncate_str, room_version::RoomVersion},
Error, PduEvent, Result,
@ -209,6 +212,9 @@ enum AdminCommand {
#[command(subcommand)]
cmd: TracingFilterCommand,
},
/// Check database integrity
CheckIntegrity,
}
#[derive(Debug, Subcommand)]
@ -249,6 +255,7 @@ pub(crate) enum AdminRoomEvent {
}
pub(crate) struct Service {
pub(crate) db: &'static dyn CheckIntegrity,
pub(crate) sender: mpsc::UnboundedSender<AdminRoomEvent>,
receiver: Mutex<mpsc::UnboundedReceiver<AdminRoomEvent>>,
}
@ -261,9 +268,10 @@ enum TracingBackend {
}
impl Service {
pub(crate) fn new() -> Arc<Self> {
pub(crate) fn new(db: &'static dyn CheckIntegrity) -> Arc<Self> {
let (sender, receiver) = mpsc::unbounded_channel();
Arc::new(Self {
db,
sender,
receiver: Mutex::new(receiver),
})
@ -1247,6 +1255,68 @@ impl Service {
"Filter reloaded",
));
}
AdminCommand::CheckIntegrity => {
let (tx, mut rx) = mpsc::channel(16);
let task = {
let db = self.db;
tokio::task::spawn_blocking(move || {
for err in db.check_integrity() {
let Ok(()) = tx.blocking_send(err?) else {
// channel has been closed
return Ok::<_, Error>(());
};
}
Ok(())
})
};
let task = AbortOnDropHandle::new(task);
let mut errors = Vec::new();
for _ in 0..50 {
let Some(error) = rx.recv().await else {
break;
};
errors.push(error);
}
let mut message = String::new();
for error in &errors {
writeln!(
message,
"- {}",
DisplayWithSources {
error,
infix: "\n - caused by: "
}
)
.unwrap();
}
if task.is_finished() {
match task.await {
Ok(Ok(())) => {
if errors.is_empty() {
writeln!(message, "No errors were found")
.unwrap();
}
}
Ok(Err(e)) => return Err(e),
Err(e) => {
writeln!(
message,
"An error occured in the validity checking \
task, results may be incorrect: {e}"
)
.unwrap();
}
};
} else {
writeln!(message, "...more errors not shown").unwrap();
}
RoomMessageEventContent::text_plain(message)
}
};
Ok(reply_message_content)