From 82a75dc2c1f52e7dc12d03dce46b0b876a5b9c60 Mon Sep 17 00:00:00 2001 From: Benjamin Lee Date: Fri, 30 Aug 2024 22:30:44 -0700 Subject: [PATCH] grapevine-specific db version namespace This allows us to break db compatibility, but such that the worst thing that happens if somebody tries to run our db against an incompatible server is that it exits immediately with an error. --- src/database.rs | 46 +++++++++++------- src/database/key_value/globals.rs | 78 +++++++++++++++++++++++++++---- src/service/globals.rs | 9 ++-- src/service/globals/data.rs | 70 ++++++++++++++++++++++++++- src/utils/error.rs | 3 ++ 5 files changed, 174 insertions(+), 32 deletions(-) diff --git a/src/database.rs b/src/database.rs index abfad85a..da87079c 100644 --- a/src/database.rs +++ b/src/database.rs @@ -25,6 +25,7 @@ use tracing::{debug, error, info, info_span, warn, Instrument}; use crate::{ config::DatabaseBackend, service::{ + globals::DbVersion, media::MediaFileKey, rooms::{ short::{ShortEventId, ShortStateHash, ShortStateKey}, @@ -534,11 +535,20 @@ impl KeyValueDatabase { #[allow(clippy::too_many_lines)] pub(crate) async fn apply_migrations(&self) -> Result<()> { // If the database has any data, perform data migrations before starting - let latest_database_version = 13; + let latest_database_version = DbVersion::Conduit(13); + + let current_database_version = services().globals.database_version()?; + if !current_database_version.is_compatible() { + error!(version = ?current_database_version, "current db version is unsupported"); + return Err(Error::bad_database( + "Database was last written with a conduit schema version \ + newer than we support.", + )); + } if services().users.count()? > 0 { // MIGRATIONS - migration(1, || { + migration(DbVersion::Conduit(1), || { for (roomserverid, _) in self.roomserverids.iter() { let mut parts = roomserverid.split(|&b| b == 0xFF); let room_id = @@ -556,7 +566,7 @@ impl KeyValueDatabase { Ok(()) })?; - migration(2, || { + migration(DbVersion::Conduit(2), || { // We accidentally inserted hashed versions of "" into the db // instead of just "" for (userid, password) in self.userid_password.iter() { @@ -574,7 +584,7 @@ impl KeyValueDatabase { Ok(()) })?; - migration(3, || { + migration(DbVersion::Conduit(3), || { // Move media to filesystem for (key, content) in self.mediaid_file.iter() { let key = MediaFileKey::new(key); @@ -590,7 +600,7 @@ impl KeyValueDatabase { Ok(()) })?; - migration(4, || { + migration(DbVersion::Conduit(4), || { // Add federated users to services() as deactivated for our_user in services().users.iter() { let our_user = our_user?; @@ -616,7 +626,7 @@ impl KeyValueDatabase { Ok(()) })?; - migration(5, || { + migration(DbVersion::Conduit(5), || { // Upgrade user data store for (roomuserdataid, _) in self.roomuserdataid_accountdata.iter() @@ -639,7 +649,7 @@ impl KeyValueDatabase { Ok(()) })?; - migration(6, || { + migration(DbVersion::Conduit(6), || { // Set room member count for (roomid, _) in self.roomid_shortstatehash.iter() { let string = utils::string_from_bytes(&roomid).unwrap(); @@ -652,7 +662,7 @@ impl KeyValueDatabase { Ok(()) })?; - migration(7, || { + migration(DbVersion::Conduit(7), || { // Upgrade state store let mut last_roomstates: HashMap = HashMap::new(); @@ -787,7 +797,7 @@ impl KeyValueDatabase { Ok(()) })?; - migration(8, || { + migration(DbVersion::Conduit(8), || { // Generate short room ids for all rooms for (room_id, _) in self.roomid_shortstatehash.iter() { let shortroomid = @@ -843,7 +853,7 @@ impl KeyValueDatabase { Ok(()) })?; - migration(9, || { + migration(DbVersion::Conduit(9), || { // Update tokenids db layout let mut iter = self .tokenids @@ -891,7 +901,7 @@ impl KeyValueDatabase { Ok(()) })?; - migration(10, || { + migration(DbVersion::Conduit(10), || { // Add other direction for shortstatekeys for (statekey, shortstatekey) in self.statekey_shortstatekey.iter() @@ -908,14 +918,14 @@ impl KeyValueDatabase { Ok(()) })?; - migration(11, || { + migration(DbVersion::Conduit(11), || { self.db .open_tree("userdevicesessionid_uiaarequest")? .clear()?; Ok(()) })?; - migration(12, || { + migration(DbVersion::Conduit(12), || { for username in services().users.list_local_users()? { let user = match UserId::parse_with_server_name( username.clone(), @@ -1017,7 +1027,7 @@ impl KeyValueDatabase { // This migration can be reused as-is anytime the server-default // rules are updated. - migration(13, || { + migration(DbVersion::Conduit(13), || { for username in services().users.list_local_users()? { let user = match UserId::parse_with_server_name( username.clone(), @@ -1079,7 +1089,7 @@ impl KeyValueDatabase { info!( backend = %services().globals.config.database.backend, - version = latest_database_version, + version = ?latest_database_version, "Loaded database", ); } else { @@ -1092,7 +1102,7 @@ impl KeyValueDatabase { info!( backend = %services().globals.config.database.backend, - version = latest_database_version, + version = ?latest_database_version, "Created new database", ); } @@ -1151,7 +1161,7 @@ impl KeyValueDatabase { /// If the current version is older than `new_version`, execute a migration /// function. -fn migration(new_version: u64, migration: F) -> Result<(), Error> +fn migration(new_version: DbVersion, migration: F) -> Result<(), Error> where F: FnOnce() -> Result<(), Error>, { @@ -1159,7 +1169,7 @@ where if current_version < new_version { migration()?; services().globals.bump_database_version(new_version)?; - warn!("Migration: {current_version} -> {new_version} finished"); + warn!("Migration: {current_version:?} -> {new_version:?} finished"); } Ok(()) } diff --git a/src/database/key_value/globals.rs b/src/database/key_value/globals.rs index 819ef98c..83396bab 100644 --- a/src/database/key_value/globals.rs +++ b/src/database/key_value/globals.rs @@ -11,12 +11,25 @@ use ruma::{ use crate::{ database::KeyValueDatabase, - service::{self, globals::SigningKeys}, + service::{ + self, + globals::{DbVersion, SigningKeys}, + }, services, utils, Error, Result, }; pub(crate) const COUNTER: &[u8] = b"c"; +/// Placeholder stored in `globals.version` to indicate that +/// `globals.namespaced_version` should be used instead. See the documentation +/// on [`DbVersion`] for more details. +// Allowed because there isn't another way to do the conversion in const context +#[allow(clippy::as_conversions)] +const CONDUIT_PLACEHOLDER_VERSION: u64 = i64::MAX as u64; + +/// Namespace tag for [`DbVersion::Grapevine`]. +const GRAPEVINE_VERSION_NAMESPACE: &[u8] = b"grapevine"; + #[async_trait] impl service::globals::Data for KeyValueDatabase { fn next_count(&self) -> Result { @@ -349,16 +362,63 @@ lasttimelinecount_cache: {lasttimelinecount_cache}\n" Ok(signingkeys) } - fn database_version(&self) -> Result { - self.global.get(b"version")?.map_or(Ok(0), |version| { - utils::u64_from_bytes(&version).map_err(|_| { - Error::bad_database("Database version id is invalid.") - }) - }) + fn database_version(&self) -> Result { + let version = + self.global.get(b"version")?.map_or(Ok(0), |version| { + utils::u64_from_bytes(&version).map_err(|_| { + Error::bad_database("Database version id is invalid.") + }) + })?; + + if version == CONDUIT_PLACEHOLDER_VERSION { + let value = + self.global.get(b"namespaced_version")?.ok_or_else(|| { + Error::bad_database("'namespace_version' is missing") + })?; + let mut parts = value.splitn(2, |&b| b == 0xFF); + let namespace = parts + .next() + .expect("splitn always returns at least one element"); + let version_bytes = parts.next().ok_or_else(|| { + Error::bad_database( + "Invalid 'namespaced_version' value: missing separator", + ) + })?; + let version = + utils::u64_from_bytes(version_bytes).map_err(|_| { + Error::bad_database( + "Invalid 'namespaced_version' value: version is not a \ + u64", + ) + })?; + + if namespace == GRAPEVINE_VERSION_NAMESPACE { + Ok(DbVersion::Grapevine(version)) + } else { + Err(Error::UnknownDbVersionNamespace(namespace.to_owned())) + } + } else { + Ok(DbVersion::Conduit(version)) + } } - fn bump_database_version(&self, new_version: u64) -> Result<()> { - self.global.insert(b"version", &new_version.to_be_bytes())?; + fn bump_database_version(&self, new_version: DbVersion) -> Result<()> { + match new_version { + DbVersion::Grapevine(version) => { + let mut value = GRAPEVINE_VERSION_NAMESPACE.to_vec(); + value.push(0xFF); + value.extend(&version.to_be_bytes()); + self.global.insert( + b"version", + &CONDUIT_PLACEHOLDER_VERSION.to_be_bytes(), + )?; + self.global.insert(b"namespaced_version", &value)?; + } + DbVersion::Conduit(version) => { + self.global.insert(b"version", &version.to_be_bytes())?; + self.global.remove(b"namespaced_version")?; + } + } Ok(()) } } diff --git a/src/service/globals.rs b/src/service/globals.rs index a1662c9e..05b68f20 100644 --- a/src/service/globals.rs +++ b/src/service/globals.rs @@ -15,7 +15,7 @@ use std::{ }; use base64::{engine::general_purpose, Engine as _}; -pub(crate) use data::{Data, SigningKeys}; +pub(crate) use data::{Data, DbVersion, SigningKeys}; use futures_util::FutureExt; use hyper::service::Service as _; use hyper_util::{ @@ -592,11 +592,14 @@ impl Service { }) } - pub(crate) fn database_version(&self) -> Result { + pub(crate) fn database_version(&self) -> Result { self.db.database_version() } - pub(crate) fn bump_database_version(&self, new_version: u64) -> Result<()> { + pub(crate) fn bump_database_version( + &self, + new_version: DbVersion, + ) -> Result<()> { self.db.bump_database_version(new_version) } diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs index 28e7e512..efadaa92 100644 --- a/src/service/globals/data.rs +++ b/src/service/globals/data.rs @@ -1,4 +1,5 @@ use std::{ + cmp::Ordering, collections::BTreeMap, time::{Duration, SystemTime}, }; @@ -14,6 +15,70 @@ use serde::Deserialize; use crate::{services, Result}; +/// Database schema version. +/// +/// In conduit, database versions were tracked with a single integer value in +/// `globals.version`, incremented with every migration. Now, we want to make +/// our own db schema changes, which may not be compatible the schema used by +/// conduit and other conduit forks. We also want to support users moving +/// existing databases between conduit, grapevine, and other conduit forks. +/// +/// To handle this, we have namespacing for db versions. Version numbers that +/// match the upstream conduit versions are still stored directly in +/// `globals.version`. Versions that are grapevine-specific are stored in +/// `globals.namespaced_version`, with the value `"grapevine" + 0xFF + version`. +/// +/// When a non-conduit compatible version is set, the value of `globals.version` +/// is set to `i64::MAX`. This ensures that software that only knows about the +/// conduit-compatible versioning will treat the db as a unknown future version +/// and refuse to interact with it. +/// +/// The last shared version is `Conduit(13)`. Past this point, we start counting +/// at `Grapevine(0)`. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub(crate) enum DbVersion { + /// Version namespace shared with conduit and other conduit forks. + Conduit(u64), + /// Grapevine-specific version namespace. + Grapevine(u64), +} + +impl DbVersion { + /// Return whether this version is part of the compatible version history + /// for grapevine. + /// + /// Version changes that were made to conduit after the fork, or made in + /// other conduit forks are not compatible. + pub(crate) fn is_compatible(self) -> bool { + match self { + DbVersion::Conduit(version) => { + // Conduit db version 13 is the last supported version in the + // shared namespace. Our schema diverges past + // this point. + version <= 13 + } + DbVersion::Grapevine(_) => true, + } + } +} + +impl PartialOrd for DbVersion { + fn partial_cmp(&self, other: &DbVersion) -> Option { + use DbVersion::{Conduit, Grapevine}; + match (self, other) { + (Conduit(a), Conduit(b)) | (Grapevine(a), Grapevine(b)) => { + Some(a.cmp(b)) + } + (&Conduit(_), Grapevine(_)) => { + self.is_compatible().then_some(Ordering::Less) + } + (Grapevine(_), &Conduit(_)) => { + other.is_compatible().then_some(Ordering::Greater) + } + } + } +} + /// Similar to [`ServerSigningKeys`], but drops a few unnecessary fields we /// don't require post-validation #[derive(Deserialize, Debug, Clone)] @@ -117,6 +182,7 @@ pub(crate) trait Data: Send + Sync { &self, origin: &ServerName, ) -> Result>; - fn database_version(&self) -> Result; - fn bump_database_version(&self, new_version: u64) -> Result<()>; + + fn database_version(&self) -> Result; + fn bump_database_version(&self, new_version: DbVersion) -> Result<()>; } diff --git a/src/utils/error.rs b/src/utils/error.rs index 49b5ab15..87194246 100644 --- a/src/utils/error.rs +++ b/src/utils/error.rs @@ -84,6 +84,9 @@ pub(crate) enum Error { UnsupportedRoomVersion(ruma::RoomVersionId), #[error("{0} in {1}")] InconsistentRoomState(&'static str, ruma::OwnedRoomId), + + #[error("unknown db version namespace {}", String::from_utf8_lossy(_0))] + UnknownDbVersionNamespace(Vec), } impl Error {