grapevine-specific db version namespace

This allows us to break db compatibility, but such that the worst thing
that happens if somebody tries to run our db against an incompatible
server is that it exits immediately with an error.
This commit is contained in:
Benjamin Lee 2024-08-30 22:30:44 -07:00
parent 9add9a1e96
commit 82a75dc2c1
No known key found for this signature in database
GPG key ID: FB9624E2885D55A4
5 changed files with 174 additions and 32 deletions

View file

@ -25,6 +25,7 @@ use tracing::{debug, error, info, info_span, warn, Instrument};
use crate::{ use crate::{
config::DatabaseBackend, config::DatabaseBackend,
service::{ service::{
globals::DbVersion,
media::MediaFileKey, media::MediaFileKey,
rooms::{ rooms::{
short::{ShortEventId, ShortStateHash, ShortStateKey}, short::{ShortEventId, ShortStateHash, ShortStateKey},
@ -534,11 +535,20 @@ impl KeyValueDatabase {
#[allow(clippy::too_many_lines)] #[allow(clippy::too_many_lines)]
pub(crate) async fn apply_migrations(&self) -> Result<()> { pub(crate) async fn apply_migrations(&self) -> Result<()> {
// If the database has any data, perform data migrations before starting // If the database has any data, perform data migrations before starting
let latest_database_version = 13; let latest_database_version = DbVersion::Conduit(13);
let current_database_version = services().globals.database_version()?;
if !current_database_version.is_compatible() {
error!(version = ?current_database_version, "current db version is unsupported");
return Err(Error::bad_database(
"Database was last written with a conduit schema version \
newer than we support.",
));
}
if services().users.count()? > 0 { if services().users.count()? > 0 {
// MIGRATIONS // MIGRATIONS
migration(1, || { migration(DbVersion::Conduit(1), || {
for (roomserverid, _) in self.roomserverids.iter() { for (roomserverid, _) in self.roomserverids.iter() {
let mut parts = roomserverid.split(|&b| b == 0xFF); let mut parts = roomserverid.split(|&b| b == 0xFF);
let room_id = let room_id =
@ -556,7 +566,7 @@ impl KeyValueDatabase {
Ok(()) Ok(())
})?; })?;
migration(2, || { migration(DbVersion::Conduit(2), || {
// We accidentally inserted hashed versions of "" into the db // We accidentally inserted hashed versions of "" into the db
// instead of just "" // instead of just ""
for (userid, password) in self.userid_password.iter() { for (userid, password) in self.userid_password.iter() {
@ -574,7 +584,7 @@ impl KeyValueDatabase {
Ok(()) Ok(())
})?; })?;
migration(3, || { migration(DbVersion::Conduit(3), || {
// Move media to filesystem // Move media to filesystem
for (key, content) in self.mediaid_file.iter() { for (key, content) in self.mediaid_file.iter() {
let key = MediaFileKey::new(key); let key = MediaFileKey::new(key);
@ -590,7 +600,7 @@ impl KeyValueDatabase {
Ok(()) Ok(())
})?; })?;
migration(4, || { migration(DbVersion::Conduit(4), || {
// Add federated users to services() as deactivated // Add federated users to services() as deactivated
for our_user in services().users.iter() { for our_user in services().users.iter() {
let our_user = our_user?; let our_user = our_user?;
@ -616,7 +626,7 @@ impl KeyValueDatabase {
Ok(()) Ok(())
})?; })?;
migration(5, || { migration(DbVersion::Conduit(5), || {
// Upgrade user data store // Upgrade user data store
for (roomuserdataid, _) in for (roomuserdataid, _) in
self.roomuserdataid_accountdata.iter() self.roomuserdataid_accountdata.iter()
@ -639,7 +649,7 @@ impl KeyValueDatabase {
Ok(()) Ok(())
})?; })?;
migration(6, || { migration(DbVersion::Conduit(6), || {
// Set room member count // Set room member count
for (roomid, _) in self.roomid_shortstatehash.iter() { for (roomid, _) in self.roomid_shortstatehash.iter() {
let string = utils::string_from_bytes(&roomid).unwrap(); let string = utils::string_from_bytes(&roomid).unwrap();
@ -652,7 +662,7 @@ impl KeyValueDatabase {
Ok(()) Ok(())
})?; })?;
migration(7, || { migration(DbVersion::Conduit(7), || {
// Upgrade state store // Upgrade state store
let mut last_roomstates: HashMap<OwnedRoomId, ShortStateHash> = let mut last_roomstates: HashMap<OwnedRoomId, ShortStateHash> =
HashMap::new(); HashMap::new();
@ -787,7 +797,7 @@ impl KeyValueDatabase {
Ok(()) Ok(())
})?; })?;
migration(8, || { migration(DbVersion::Conduit(8), || {
// Generate short room ids for all rooms // Generate short room ids for all rooms
for (room_id, _) in self.roomid_shortstatehash.iter() { for (room_id, _) in self.roomid_shortstatehash.iter() {
let shortroomid = let shortroomid =
@ -843,7 +853,7 @@ impl KeyValueDatabase {
Ok(()) Ok(())
})?; })?;
migration(9, || { migration(DbVersion::Conduit(9), || {
// Update tokenids db layout // Update tokenids db layout
let mut iter = self let mut iter = self
.tokenids .tokenids
@ -891,7 +901,7 @@ impl KeyValueDatabase {
Ok(()) Ok(())
})?; })?;
migration(10, || { migration(DbVersion::Conduit(10), || {
// Add other direction for shortstatekeys // Add other direction for shortstatekeys
for (statekey, shortstatekey) in for (statekey, shortstatekey) in
self.statekey_shortstatekey.iter() self.statekey_shortstatekey.iter()
@ -908,14 +918,14 @@ impl KeyValueDatabase {
Ok(()) Ok(())
})?; })?;
migration(11, || { migration(DbVersion::Conduit(11), || {
self.db self.db
.open_tree("userdevicesessionid_uiaarequest")? .open_tree("userdevicesessionid_uiaarequest")?
.clear()?; .clear()?;
Ok(()) Ok(())
})?; })?;
migration(12, || { migration(DbVersion::Conduit(12), || {
for username in services().users.list_local_users()? { for username in services().users.list_local_users()? {
let user = match UserId::parse_with_server_name( let user = match UserId::parse_with_server_name(
username.clone(), username.clone(),
@ -1017,7 +1027,7 @@ impl KeyValueDatabase {
// This migration can be reused as-is anytime the server-default // This migration can be reused as-is anytime the server-default
// rules are updated. // rules are updated.
migration(13, || { migration(DbVersion::Conduit(13), || {
for username in services().users.list_local_users()? { for username in services().users.list_local_users()? {
let user = match UserId::parse_with_server_name( let user = match UserId::parse_with_server_name(
username.clone(), username.clone(),
@ -1079,7 +1089,7 @@ impl KeyValueDatabase {
info!( info!(
backend = %services().globals.config.database.backend, backend = %services().globals.config.database.backend,
version = latest_database_version, version = ?latest_database_version,
"Loaded database", "Loaded database",
); );
} else { } else {
@ -1092,7 +1102,7 @@ impl KeyValueDatabase {
info!( info!(
backend = %services().globals.config.database.backend, backend = %services().globals.config.database.backend,
version = latest_database_version, version = ?latest_database_version,
"Created new database", "Created new database",
); );
} }
@ -1151,7 +1161,7 @@ impl KeyValueDatabase {
/// If the current version is older than `new_version`, execute a migration /// If the current version is older than `new_version`, execute a migration
/// function. /// function.
fn migration<F>(new_version: u64, migration: F) -> Result<(), Error> fn migration<F>(new_version: DbVersion, migration: F) -> Result<(), Error>
where where
F: FnOnce() -> Result<(), Error>, F: FnOnce() -> Result<(), Error>,
{ {
@ -1159,7 +1169,7 @@ where
if current_version < new_version { if current_version < new_version {
migration()?; migration()?;
services().globals.bump_database_version(new_version)?; services().globals.bump_database_version(new_version)?;
warn!("Migration: {current_version} -> {new_version} finished"); warn!("Migration: {current_version:?} -> {new_version:?} finished");
} }
Ok(()) Ok(())
} }

View file

@ -11,12 +11,25 @@ use ruma::{
use crate::{ use crate::{
database::KeyValueDatabase, database::KeyValueDatabase,
service::{self, globals::SigningKeys}, service::{
self,
globals::{DbVersion, SigningKeys},
},
services, utils, Error, Result, services, utils, Error, Result,
}; };
pub(crate) const COUNTER: &[u8] = b"c"; pub(crate) const COUNTER: &[u8] = b"c";
/// Placeholder stored in `globals.version` to indicate that
/// `globals.namespaced_version` should be used instead. See the documentation
/// on [`DbVersion`] for more details.
// Allowed because there isn't another way to do the conversion in const context
#[allow(clippy::as_conversions)]
const CONDUIT_PLACEHOLDER_VERSION: u64 = i64::MAX as u64;
/// Namespace tag for [`DbVersion::Grapevine`].
const GRAPEVINE_VERSION_NAMESPACE: &[u8] = b"grapevine";
#[async_trait] #[async_trait]
impl service::globals::Data for KeyValueDatabase { impl service::globals::Data for KeyValueDatabase {
fn next_count(&self) -> Result<u64> { fn next_count(&self) -> Result<u64> {
@ -349,16 +362,63 @@ lasttimelinecount_cache: {lasttimelinecount_cache}\n"
Ok(signingkeys) Ok(signingkeys)
} }
fn database_version(&self) -> Result<u64> { fn database_version(&self) -> Result<DbVersion> {
self.global.get(b"version")?.map_or(Ok(0), |version| { let version =
utils::u64_from_bytes(&version).map_err(|_| { self.global.get(b"version")?.map_or(Ok(0), |version| {
Error::bad_database("Database version id is invalid.") utils::u64_from_bytes(&version).map_err(|_| {
}) Error::bad_database("Database version id is invalid.")
}) })
})?;
if version == CONDUIT_PLACEHOLDER_VERSION {
let value =
self.global.get(b"namespaced_version")?.ok_or_else(|| {
Error::bad_database("'namespace_version' is missing")
})?;
let mut parts = value.splitn(2, |&b| b == 0xFF);
let namespace = parts
.next()
.expect("splitn always returns at least one element");
let version_bytes = parts.next().ok_or_else(|| {
Error::bad_database(
"Invalid 'namespaced_version' value: missing separator",
)
})?;
let version =
utils::u64_from_bytes(version_bytes).map_err(|_| {
Error::bad_database(
"Invalid 'namespaced_version' value: version is not a \
u64",
)
})?;
if namespace == GRAPEVINE_VERSION_NAMESPACE {
Ok(DbVersion::Grapevine(version))
} else {
Err(Error::UnknownDbVersionNamespace(namespace.to_owned()))
}
} else {
Ok(DbVersion::Conduit(version))
}
} }
fn bump_database_version(&self, new_version: u64) -> Result<()> { fn bump_database_version(&self, new_version: DbVersion) -> Result<()> {
self.global.insert(b"version", &new_version.to_be_bytes())?; match new_version {
DbVersion::Grapevine(version) => {
let mut value = GRAPEVINE_VERSION_NAMESPACE.to_vec();
value.push(0xFF);
value.extend(&version.to_be_bytes());
self.global.insert(
b"version",
&CONDUIT_PLACEHOLDER_VERSION.to_be_bytes(),
)?;
self.global.insert(b"namespaced_version", &value)?;
}
DbVersion::Conduit(version) => {
self.global.insert(b"version", &version.to_be_bytes())?;
self.global.remove(b"namespaced_version")?;
}
}
Ok(()) Ok(())
} }
} }

View file

@ -15,7 +15,7 @@ use std::{
}; };
use base64::{engine::general_purpose, Engine as _}; use base64::{engine::general_purpose, Engine as _};
pub(crate) use data::{Data, SigningKeys}; pub(crate) use data::{Data, DbVersion, SigningKeys};
use futures_util::FutureExt; use futures_util::FutureExt;
use hyper::service::Service as _; use hyper::service::Service as _;
use hyper_util::{ use hyper_util::{
@ -592,11 +592,14 @@ impl Service {
}) })
} }
pub(crate) fn database_version(&self) -> Result<u64> { pub(crate) fn database_version(&self) -> Result<DbVersion> {
self.db.database_version() self.db.database_version()
} }
pub(crate) fn bump_database_version(&self, new_version: u64) -> Result<()> { pub(crate) fn bump_database_version(
&self,
new_version: DbVersion,
) -> Result<()> {
self.db.bump_database_version(new_version) self.db.bump_database_version(new_version)
} }

View file

@ -1,4 +1,5 @@
use std::{ use std::{
cmp::Ordering,
collections::BTreeMap, collections::BTreeMap,
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
@ -14,6 +15,70 @@ use serde::Deserialize;
use crate::{services, Result}; use crate::{services, Result};
/// Database schema version.
///
/// In conduit, database versions were tracked with a single integer value in
/// `globals.version`, incremented with every migration. Now, we want to make
/// our own db schema changes, which may not be compatible the schema used by
/// conduit and other conduit forks. We also want to support users moving
/// existing databases between conduit, grapevine, and other conduit forks.
///
/// To handle this, we have namespacing for db versions. Version numbers that
/// match the upstream conduit versions are still stored directly in
/// `globals.version`. Versions that are grapevine-specific are stored in
/// `globals.namespaced_version`, with the value `"grapevine" + 0xFF + version`.
///
/// When a non-conduit compatible version is set, the value of `globals.version`
/// is set to `i64::MAX`. This ensures that software that only knows about the
/// conduit-compatible versioning will treat the db as a unknown future version
/// and refuse to interact with it.
///
/// The last shared version is `Conduit(13)`. Past this point, we start counting
/// at `Grapevine(0)`.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub(crate) enum DbVersion {
/// Version namespace shared with conduit and other conduit forks.
Conduit(u64),
/// Grapevine-specific version namespace.
Grapevine(u64),
}
impl DbVersion {
/// Return whether this version is part of the compatible version history
/// for grapevine.
///
/// Version changes that were made to conduit after the fork, or made in
/// other conduit forks are not compatible.
pub(crate) fn is_compatible(self) -> bool {
match self {
DbVersion::Conduit(version) => {
// Conduit db version 13 is the last supported version in the
// shared namespace. Our schema diverges past
// this point.
version <= 13
}
DbVersion::Grapevine(_) => true,
}
}
}
impl PartialOrd for DbVersion {
fn partial_cmp(&self, other: &DbVersion) -> Option<Ordering> {
use DbVersion::{Conduit, Grapevine};
match (self, other) {
(Conduit(a), Conduit(b)) | (Grapevine(a), Grapevine(b)) => {
Some(a.cmp(b))
}
(&Conduit(_), Grapevine(_)) => {
self.is_compatible().then_some(Ordering::Less)
}
(Grapevine(_), &Conduit(_)) => {
other.is_compatible().then_some(Ordering::Greater)
}
}
}
}
/// Similar to [`ServerSigningKeys`], but drops a few unnecessary fields we /// Similar to [`ServerSigningKeys`], but drops a few unnecessary fields we
/// don't require post-validation /// don't require post-validation
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
@ -117,6 +182,7 @@ pub(crate) trait Data: Send + Sync {
&self, &self,
origin: &ServerName, origin: &ServerName,
) -> Result<Option<SigningKeys>>; ) -> Result<Option<SigningKeys>>;
fn database_version(&self) -> Result<u64>;
fn bump_database_version(&self, new_version: u64) -> Result<()>; fn database_version(&self) -> Result<DbVersion>;
fn bump_database_version(&self, new_version: DbVersion) -> Result<()>;
} }

View file

@ -84,6 +84,9 @@ pub(crate) enum Error {
UnsupportedRoomVersion(ruma::RoomVersionId), UnsupportedRoomVersion(ruma::RoomVersionId),
#[error("{0} in {1}")] #[error("{0} in {1}")]
InconsistentRoomState(&'static str, ruma::OwnedRoomId), InconsistentRoomState(&'static str, ruma::OwnedRoomId),
#[error("unknown db version namespace {}", String::from_utf8_lossy(_0))]
UnknownDbVersionNamespace(Vec<u8>),
} }
impl Error { impl Error {