Merge branch 'benjamin/break-db-compatibility' into 'main'

break db compatibility

See merge request matrix/grapevine!85
This commit is contained in:
Benjamin Lee 2024-09-26 22:24:23 +00:00
commit c1f158276e
12 changed files with 894 additions and 35 deletions

20
Cargo.lock generated
View file

@ -640,6 +640,12 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fastrand"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
[[package]]
name = "fdeflate"
version = "0.3.4"
@ -843,6 +849,7 @@ dependencies = [
"serde_yaml",
"sha-1",
"strum",
"tempfile",
"thiserror",
"thread_local",
"tikv-jemallocator",
@ -2782,6 +2789,19 @@ dependencies = [
"futures-core",
]
[[package]]
name = "tempfile"
version = "3.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64"
dependencies = [
"cfg-if",
"fastrand",
"once_cell",
"rustix",
"windows-sys 0.59.0",
]
[[package]]
name = "terminal_size"
version = "0.3.0"

View file

@ -154,3 +154,6 @@ jemalloc = ["dep:tikv-jemallocator"]
rocksdb = ["dep:rocksdb"]
sqlite = ["dep:rusqlite", "dep:parking_lot", "tokio/signal"]
systemd = ["dep:sd-notify"]
[dev-dependencies]
tempfile = "3.12.0"

View file

@ -136,6 +136,10 @@ This will be the first release of Grapevine since it was forked from Conduit
the server is now behind the `serve` command, so `grapevine --config ...`
becomes `grapevine serve --config ...`.
([!108](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/108))
14. **BREAKING** Break db compatibility with conduit. In order to move back to
conduit from grapevine, admins now need to use the `grapevine db migrate`
command.
([!85](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/85))
### Fixed
@ -248,3 +252,6 @@ This will be the first release of Grapevine since it was forked from Conduit
[!102](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/102))
19. Allow configuring the served API components per listener.
([!109](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/109))
20. Added `grapevine db migrate` CLI command, to migrate database between server
implementations.
([!85](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/85))

View file

@ -3,12 +3,13 @@
//! CLI argument structs are defined in this module. Execution logic for each
//! command goes in a submodule.
use std::path::PathBuf;
use std::{path::PathBuf, str::FromStr};
use clap::{Parser, Subcommand};
use crate::error;
mod migrate_db;
mod serve;
/// Command line arguments
@ -26,6 +27,10 @@ pub(crate) struct Args {
pub(crate) enum Command {
/// Run the server.
Serve(ServeArgs),
/// Commands for interacting with the database.
#[clap(subcommand)]
Db(DbCommand),
}
/// Wrapper for the `--config` arg.
@ -57,10 +62,151 @@ pub(crate) struct ServeArgs {
pub(crate) config: ConfigArg,
}
#[derive(Subcommand)]
pub(crate) enum DbCommand {
/// Migrate database from one server implementation to another.
///
/// This command is not protected against symlink-swapping attacks. Do not
/// use it when any subdirectories or parents of the `--in`, `--out`, or
/// `--inplace` directories may be written by an untrusted user during
/// execution.
Migrate(MigrateDbArgs),
}
#[derive(clap::Args)]
pub(crate) struct MigrateDbArgs {
#[clap(flatten)]
config: ConfigArg,
/// Target server implementation to migrate database to.
///
/// If migrating to the current version of grapevine, specify the version
/// as 'grapevine'.
///
/// If migrating to a released version of conduit, specified the version
/// of conduit as `conduit-{version}` (example: `conduit-0.8.0`). If
/// migrating to an unreleased conduit build, instead specify the raw
/// database version as `conduit-db-{version}` (example: `conduit-db-13`).
/// The raw database version can be found by looking at the
/// `latest_database_version` variable in `src/database/mod.rs`.
///
/// The server implementation used for the current database will be
/// detected automatically, and does not need to be specified.
#[clap(long)]
pub(crate) to: DbMigrationTarget,
/// Path to read database from.
#[clap(long = "in", short, required_unless_present("inplace_path"))]
pub(crate) in_path: Option<PathBuf>,
/// Path to write migrated database to.
#[clap(long = "out", short, required_unless_present("inplace_path"))]
pub(crate) out_path: Option<PathBuf>,
/// Path to modify an existing database in-place, instead of copying before
/// migrating.
///
/// Note that even a successful migration may lose data, because some parts
/// of the schema present in the initial database may not exist in the
/// target version. Because of this, it's very important to have a
/// backup of the initial database when migrating. The preferred way to
/// do this is with the --in and --out flags, which ensure that the
/// original database path is left unmodified. In some situations, it
/// may be possible to take a backup some other way (transferring it
/// over the network, for example), but copying the files locally is
/// undesirable. In this case, setting the --i-have-tested-my-backups
/// flag enables the use of --inplace to modify the database without
/// copying to a new location first.
#[clap(long = "inplace", conflicts_with_all(["in_path", "out_path"]))]
pub(crate) inplace_path: Option<PathBuf>,
/// Set if you have tested your backups, to enable use of the --inplace
/// flag.
///
/// See the documentation of --inplace for more details.
#[clap(long)]
pub(crate) i_have_tested_my_backups: bool,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum DbMigrationTarget {
/// The latest grapevine db version
///
/// Example:
///
/// ```
/// assert_eq!("grapevine".parse(), Ok(DbMigrationTarget::Grapevine))
/// ```
Grapevine,
/// A conduit-compatible db version.
///
/// This may either be specified as a released version number or directly
/// as a database version. The raw database version must be used when
/// migrating to a conduit deployment built from an unreleased commit
/// on the `next` branch.
Conduit(ConduitDbVersion),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum ConduitDbVersion {
/// A conduit release version number
///
/// Example:
///
/// ```
/// assert_eq!(
/// "conduit-0.8.0".parse(),
/// Ok(DbMigrationTarget::Conduit(ConduitDbVersion::Release("0.8.0")))
/// );
/// ```
Release(String),
/// A raw database version
///
/// This corresponds directly to a
/// [`crate::service::globals::DbVersion::Conduit`] version.
///
/// Example:
///
/// ```
/// assert_eq!(
/// "conduit-db-13".parse(),
/// Ok(DbMigrationTarget::Conduit(ConduitDbVersion::Db(13)))
/// );
/// ```
Db(u64),
}
#[derive(thiserror::Error, Debug)]
#[error("invalid db migration target version")]
pub(crate) struct DbMigrationTargetParseError;
impl FromStr for DbMigrationTarget {
type Err = DbMigrationTargetParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s == "grapevine" {
Ok(DbMigrationTarget::Grapevine)
} else if let Some(version) = s.strip_prefix("conduit-db-") {
let version =
version.parse().map_err(|_| DbMigrationTargetParseError)?;
Ok(DbMigrationTarget::Conduit(ConduitDbVersion::Db(version)))
} else if let Some(version) = s.strip_prefix("conduit-") {
Ok(DbMigrationTarget::Conduit(ConduitDbVersion::Release(
version.to_owned(),
)))
} else {
Err(DbMigrationTargetParseError)
}
}
}
impl Args {
pub(crate) async fn run(self) -> Result<(), error::Main> {
match self.command {
Command::Serve(args) => serve::run(args).await?,
Command::Db(DbCommand::Migrate(args)) => {
migrate_db::run(args).await?;
}
}
Ok(())
}

132
src/cli/migrate_db.rs Normal file
View file

@ -0,0 +1,132 @@
use std::cmp::Ordering;
use tracing::info;
use super::{ConduitDbVersion, DbMigrationTarget, MigrateDbArgs};
use crate::{
config, database::KeyValueDatabase, error, observability,
service::globals::DbVersion, services, utils::copy_dir, Services,
};
impl DbMigrationTarget {
fn to_db_version(&self) -> Result<DbVersion, error::MigrateDbCommand> {
use error::MigrateDbCommand as Error;
let latest_grapevine_version = DbVersion::Grapevine(0);
match self {
DbMigrationTarget::Grapevine => Ok(latest_grapevine_version),
DbMigrationTarget::Conduit(ConduitDbVersion::Db(version)) => {
Ok(DbVersion::Conduit(*version))
}
DbMigrationTarget::Conduit(ConduitDbVersion::Release(version)) => {
match &**version {
"0.8.0" => Ok(DbVersion::Conduit(13)),
_ => Err(Error::TargetVersionUnsupported),
}
}
}
}
}
pub(crate) async fn run(
args: MigrateDbArgs,
) -> Result<(), error::MigrateDbCommand> {
use error::MigrateDbCommand as Error;
let db_path = if let Some(path) = args.inplace_path {
if !args.i_have_tested_my_backups {
return Err(Error::InplaceUnconfirmed);
}
path
} else {
let in_path = args
.in_path
.expect("in_path should be required if inplace_path is unset");
let out_path = args
.out_path
.expect("out_path should be required if inplace_path is unset");
copy_dir(&in_path, &out_path).await.map_err(Error::Copy)?;
out_path
};
let mut config = config::load(args.config.config.as_ref()).await?;
// mutating the config like this is ugly, but difficult to avoid. Currently
// the database is very tightly coupled with service code, which reads the
// path only from the config.
db_path
.to_str()
.ok_or(Error::InvalidUnicodeOutPath)?
.clone_into(&mut config.database.path);
let (_guard, reload_handles) = observability::init(&config)?;
let db = Box::leak(Box::new(
KeyValueDatabase::load_or_create(&config).map_err(Error::LoadDb)?,
));
Services::build(db, config, reload_handles)
.map_err(Error::InitializeServices)?
.install();
services().globals.err_if_server_name_changed()?;
let get_current =
|| services().globals.database_version().map_err(Error::MigrateDb);
let current = get_current()?;
let target = args.to.to_db_version()?;
let latest = DbVersion::Grapevine(0);
info!("Migrating from {current:?} to {target:?}");
if !services().globals.config.conduit_compat {
if let DbMigrationTarget::Conduit(_) = args.to {
return Err(Error::ConduitCompatDisabled);
}
if let DbVersion::Conduit(_) = current {
return Err(Error::ConduitCompatDisabled);
}
}
if target == current {
// No-op
} else if target == latest {
// Migrate to latest grapevine
if !current.partial_cmp(&latest).is_some_and(Ordering::is_le) {
return Err(Error::DbVersionUnsupported(current));
}
db.apply_migrations().await.map_err(Error::MigrateDb)?;
} else if target == DbVersion::Conduit(13) {
// Migrate to latest grapevine so we have a consistent starting point
if !current.partial_cmp(&latest).is_some_and(Ordering::is_le) {
return Err(Error::DbVersionUnsupported(current));
}
db.apply_migrations().await.map_err(Error::MigrateDb)?;
assert_eq!(
get_current()?,
latest,
"should have migrated to latest version"
);
// Undo Conduit(13) -> Grapevine(0)
//
// This is a no-op that only changes the db version namespace. Setting
// the version to Conduit(_) will restore the original state.
services()
.globals
.bump_database_version(DbVersion::Conduit(13))
.map_err(Error::MigrateDb)?;
} else {
return Err(Error::TargetVersionUnsupported);
}
assert_eq!(
get_current()?,
target,
"should have migrated to target version"
);
info!("Migration successful");
Ok(())
}

View file

@ -25,6 +25,7 @@ use tracing::{debug, error, info, info_span, warn, Instrument};
use crate::{
config::DatabaseBackend,
service::{
globals::DbVersion,
media::MediaFileKey,
rooms::{
short::{ShortEventId, ShortStateHash, ShortStateKey},
@ -534,11 +535,20 @@ impl KeyValueDatabase {
#[allow(clippy::too_many_lines)]
pub(crate) async fn apply_migrations(&self) -> Result<()> {
// If the database has any data, perform data migrations before starting
let latest_database_version = 13;
let latest_database_version = DbVersion::Grapevine(0);
let current_database_version = services().globals.database_version()?;
if !current_database_version.is_compatible() {
error!(version = ?current_database_version, "current db version is unsupported");
return Err(Error::bad_database(
"Database was last written with a conduit schema version \
newer than we support.",
));
}
if services().users.count()? > 0 {
// MIGRATIONS
migration(1, || {
migration(DbVersion::Conduit(1), || {
for (roomserverid, _) in self.roomserverids.iter() {
let mut parts = roomserverid.split(|&b| b == 0xFF);
let room_id =
@ -556,7 +566,7 @@ impl KeyValueDatabase {
Ok(())
})?;
migration(2, || {
migration(DbVersion::Conduit(2), || {
// We accidentally inserted hashed versions of "" into the db
// instead of just ""
for (userid, password) in self.userid_password.iter() {
@ -574,7 +584,7 @@ impl KeyValueDatabase {
Ok(())
})?;
migration(3, || {
migration(DbVersion::Conduit(3), || {
// Move media to filesystem
for (key, content) in self.mediaid_file.iter() {
let key = MediaFileKey::new(key);
@ -590,7 +600,7 @@ impl KeyValueDatabase {
Ok(())
})?;
migration(4, || {
migration(DbVersion::Conduit(4), || {
// Add federated users to services() as deactivated
for our_user in services().users.iter() {
let our_user = our_user?;
@ -616,7 +626,7 @@ impl KeyValueDatabase {
Ok(())
})?;
migration(5, || {
migration(DbVersion::Conduit(5), || {
// Upgrade user data store
for (roomuserdataid, _) in
self.roomuserdataid_accountdata.iter()
@ -639,7 +649,7 @@ impl KeyValueDatabase {
Ok(())
})?;
migration(6, || {
migration(DbVersion::Conduit(6), || {
// Set room member count
for (roomid, _) in self.roomid_shortstatehash.iter() {
let string = utils::string_from_bytes(&roomid).unwrap();
@ -652,7 +662,7 @@ impl KeyValueDatabase {
Ok(())
})?;
migration(7, || {
migration(DbVersion::Conduit(7), || {
// Upgrade state store
let mut last_roomstates: HashMap<OwnedRoomId, ShortStateHash> =
HashMap::new();
@ -787,7 +797,7 @@ impl KeyValueDatabase {
Ok(())
})?;
migration(8, || {
migration(DbVersion::Conduit(8), || {
// Generate short room ids for all rooms
for (room_id, _) in self.roomid_shortstatehash.iter() {
let shortroomid =
@ -843,7 +853,7 @@ impl KeyValueDatabase {
Ok(())
})?;
migration(9, || {
migration(DbVersion::Conduit(9), || {
// Update tokenids db layout
let mut iter = self
.tokenids
@ -891,7 +901,7 @@ impl KeyValueDatabase {
Ok(())
})?;
migration(10, || {
migration(DbVersion::Conduit(10), || {
// Add other direction for shortstatekeys
for (statekey, shortstatekey) in
self.statekey_shortstatekey.iter()
@ -908,14 +918,14 @@ impl KeyValueDatabase {
Ok(())
})?;
migration(11, || {
migration(DbVersion::Conduit(11), || {
self.db
.open_tree("userdevicesessionid_uiaarequest")?
.clear()?;
Ok(())
})?;
migration(12, || {
migration(DbVersion::Conduit(12), || {
for username in services().users.list_local_users()? {
let user = match UserId::parse_with_server_name(
username.clone(),
@ -1017,7 +1027,7 @@ impl KeyValueDatabase {
// This migration can be reused as-is anytime the server-default
// rules are updated.
migration(13, || {
migration(DbVersion::Conduit(13), || {
for username in services().users.list_local_users()? {
let user = match UserId::parse_with_server_name(
username.clone(),
@ -1071,6 +1081,10 @@ impl KeyValueDatabase {
Ok(())
})?;
// Switch to the grapevine version namespace and break db
// compatibility with conduit. Otherwise a no-op.
migration(DbVersion::Grapevine(0), || Ok(()))?;
assert_eq!(
services().globals.database_version().unwrap(),
latest_database_version,
@ -1079,7 +1093,7 @@ impl KeyValueDatabase {
info!(
backend = %services().globals.config.database.backend,
version = latest_database_version,
version = ?latest_database_version,
"Loaded database",
);
} else {
@ -1092,7 +1106,7 @@ impl KeyValueDatabase {
info!(
backend = %services().globals.config.database.backend,
version = latest_database_version,
version = ?latest_database_version,
"Created new database",
);
}
@ -1151,7 +1165,7 @@ impl KeyValueDatabase {
/// If the current version is older than `new_version`, execute a migration
/// function.
fn migration<F>(new_version: u64, migration: F) -> Result<(), Error>
fn migration<F>(new_version: DbVersion, migration: F) -> Result<(), Error>
where
F: FnOnce() -> Result<(), Error>,
{
@ -1159,7 +1173,7 @@ where
if current_version < new_version {
migration()?;
services().globals.bump_database_version(new_version)?;
warn!("Migration: {current_version} -> {new_version} finished");
warn!("Migration: {current_version:?} -> {new_version:?} finished");
}
Ok(())
}

View file

@ -11,12 +11,25 @@ use ruma::{
use crate::{
database::KeyValueDatabase,
service::{self, globals::SigningKeys},
service::{
self,
globals::{DbVersion, SigningKeys},
},
services, utils, Error, Result,
};
pub(crate) const COUNTER: &[u8] = b"c";
/// Placeholder stored in `globals.version` to indicate that
/// `globals.namespaced_version` should be used instead. See the documentation
/// on [`DbVersion`] for more details.
// Allowed because there isn't another way to do the conversion in const context
#[allow(clippy::as_conversions)]
const CONDUIT_PLACEHOLDER_VERSION: u64 = i64::MAX as u64;
/// Namespace tag for [`DbVersion::Grapevine`].
const GRAPEVINE_VERSION_NAMESPACE: &[u8] = b"grapevine";
#[async_trait]
impl service::globals::Data for KeyValueDatabase {
fn next_count(&self) -> Result<u64> {
@ -349,16 +362,63 @@ lasttimelinecount_cache: {lasttimelinecount_cache}\n"
Ok(signingkeys)
}
fn database_version(&self) -> Result<u64> {
self.global.get(b"version")?.map_or(Ok(0), |version| {
utils::u64_from_bytes(&version).map_err(|_| {
Error::bad_database("Database version id is invalid.")
})
})
fn database_version(&self) -> Result<DbVersion> {
let version =
self.global.get(b"version")?.map_or(Ok(0), |version| {
utils::u64_from_bytes(&version).map_err(|_| {
Error::bad_database("Database version id is invalid.")
})
})?;
if version == CONDUIT_PLACEHOLDER_VERSION {
let value =
self.global.get(b"namespaced_version")?.ok_or_else(|| {
Error::bad_database("'namespace_version' is missing")
})?;
let mut parts = value.splitn(2, |&b| b == 0xFF);
let namespace = parts
.next()
.expect("splitn always returns at least one element");
let version_bytes = parts.next().ok_or_else(|| {
Error::bad_database(
"Invalid 'namespaced_version' value: missing separator",
)
})?;
let version =
utils::u64_from_bytes(version_bytes).map_err(|_| {
Error::bad_database(
"Invalid 'namespaced_version' value: version is not a \
u64",
)
})?;
if namespace == GRAPEVINE_VERSION_NAMESPACE {
Ok(DbVersion::Grapevine(version))
} else {
Err(Error::UnknownDbVersionNamespace(namespace.to_owned()))
}
} else {
Ok(DbVersion::Conduit(version))
}
}
fn bump_database_version(&self, new_version: u64) -> Result<()> {
self.global.insert(b"version", &new_version.to_be_bytes())?;
fn bump_database_version(&self, new_version: DbVersion) -> Result<()> {
match new_version {
DbVersion::Grapevine(version) => {
let mut value = GRAPEVINE_VERSION_NAMESPACE.to_vec();
value.push(0xFF);
value.extend(&version.to_be_bytes());
self.global.insert(
b"version",
&CONDUIT_PLACEHOLDER_VERSION.to_be_bytes(),
)?;
self.global.insert(b"namespaced_version", &value)?;
}
DbVersion::Conduit(version) => {
self.global.insert(b"version", &version.to_be_bytes())?;
self.global.remove(b"namespaced_version")?;
}
}
Ok(())
}
}

View file

@ -4,7 +4,7 @@ use std::{fmt, iter, path::PathBuf};
use thiserror::Error;
use crate::config::ListenConfig;
use crate::{config::ListenConfig, service::globals::DbVersion};
/// Formats an [`Error`][0] and its [`source`][1]s with a separator
///
@ -42,6 +42,9 @@ impl fmt::Display for DisplayWithSources<'_> {
pub(crate) enum Main {
#[error(transparent)]
ServeCommand(#[from] ServeCommand),
#[error(transparent)]
MigrateDbCommand(#[from] MigrateDbCommand),
}
/// Errors returned from the `serve` CLI subcommand.
@ -85,6 +88,105 @@ pub(crate) enum ServerNameChanged {
Renamed,
}
/// Top-level errors from the `db migrate` subcommand.
// Missing docs are allowed here since that kind of information should be
// encoded in the error messages themselves anyway.
#[allow(missing_docs)]
#[derive(Error, Debug)]
pub(crate) enum MigrateDbCommand {
#[error("output path is not valid unicode")]
InvalidUnicodeOutPath,
#[error(
"conduit_compat config option must be enabled to migrate database to \
or from conduit. Note that you cannot currently enable \
conduit_compat on a database that was originally created in \
grapevine with it disabled."
)]
ConduitCompatDisabled,
#[error(
"migrating a database may lose data even if the migration is \
successful. Because of this, it is very important to ensure you have \
a working backup when using the --inplace flag. If you have a tested \
backup, set the --i-have-tested-my-backups flag to enable use of \
--inplace. Alternatively, use --from and --to instead of --inplace \
to ensure the original database is preserved."
)]
InplaceUnconfirmed,
#[error("failed to copy existing database directory")]
Copy(#[source] CopyDir),
#[error("failed to initialize observability")]
Observability(#[from] Observability),
#[error("failed to load configuration")]
Config(#[from] Config),
#[error("failed to load database")]
LoadDb(#[source] crate::utils::error::Error),
#[error("failed to initialize services")]
InitializeServices(#[source] crate::utils::error::Error),
#[error("`server_name` change check failed")]
ServerNameChanged(#[from] ServerNameChanged),
#[error("failed to migrate database")]
MigrateDb(#[source] crate::utils::error::Error),
#[error("initial database version is not supported for migration: {_0:?}")]
DbVersionUnsupported(DbVersion),
#[error("target database version is not supported for migration")]
TargetVersionUnsupported,
}
/// Errors copying a directory recursively.
///
/// Returned by the [`crate::utils::copy_dir`] function.
// Missing docs are allowed here since that kind of information should be
// encoded in the error messages themselves anyway.
#[allow(missing_docs)]
#[derive(Error, Debug)]
pub(crate) enum CopyDir {
#[error("source and destination paths overlap")]
Overlap,
#[error("destination path already exists")]
AlreadyExists,
#[error("failed to canonicalize source path to check for overlap")]
CanonicalizeIn(#[source] std::io::Error),
#[error("failed to canonicalize destination path to check for overlap")]
CanonicalizeOut(#[source] std::io::Error),
#[error("failed to check whether destination path exists")]
CheckExists(#[source] std::io::Error),
#[error("failed to create destination directory at {}", _0.display())]
CreateDir(PathBuf, #[source] std::io::Error),
#[error("failed to read contents of directory at {}", _0.display())]
ReadDir(PathBuf, #[source] std::io::Error),
#[error("failed to read file metadata at {}", _0.display())]
Metadata(PathBuf, #[source] std::io::Error),
#[error("failed to copy file from {} to {}", from.display(), to.display())]
CopyFile {
from: PathBuf,
to: PathBuf,
#[source]
error: std::io::Error,
},
#[error("source directory contains a symlink at {}. Refusing to copy.", _0.display())]
Symlink(PathBuf),
}
/// Observability initialization errors
// Missing docs are allowed here since that kind of information should be
// encoded in the error messages themselves anyway.

View file

@ -15,7 +15,7 @@ use std::{
};
use base64::{engine::general_purpose, Engine as _};
pub(crate) use data::{Data, SigningKeys};
pub(crate) use data::{Data, DbVersion, SigningKeys};
use futures_util::FutureExt;
use hyper::service::Service as _;
use hyper_util::{
@ -592,11 +592,14 @@ impl Service {
})
}
pub(crate) fn database_version(&self) -> Result<u64> {
pub(crate) fn database_version(&self) -> Result<DbVersion> {
self.db.database_version()
}
pub(crate) fn bump_database_version(&self, new_version: u64) -> Result<()> {
pub(crate) fn bump_database_version(
&self,
new_version: DbVersion,
) -> Result<()> {
self.db.bump_database_version(new_version)
}

View file

@ -1,4 +1,5 @@
use std::{
cmp::Ordering,
collections::BTreeMap,
time::{Duration, SystemTime},
};
@ -14,6 +15,70 @@ use serde::Deserialize;
use crate::{services, Result};
/// Database schema version.
///
/// In conduit, database versions were tracked with a single integer value in
/// `globals.version`, incremented with every migration. Now, we want to make
/// our own db schema changes, which may not be compatible the schema used by
/// conduit and other conduit forks. We also want to support users moving
/// existing databases between conduit, grapevine, and other conduit forks.
///
/// To handle this, we have namespacing for db versions. Version numbers that
/// match the upstream conduit versions are still stored directly in
/// `globals.version`. Versions that are grapevine-specific are stored in
/// `globals.namespaced_version`, with the value `"grapevine" + 0xFF + version`.
///
/// When a non-conduit compatible version is set, the value of `globals.version`
/// is set to `i64::MAX`. This ensures that software that only knows about the
/// conduit-compatible versioning will treat the db as a unknown future version
/// and refuse to interact with it.
///
/// The last shared version is `Conduit(13)`. Past this point, we start counting
/// at `Grapevine(0)`.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub(crate) enum DbVersion {
/// Version namespace shared with conduit and other conduit forks.
Conduit(u64),
/// Grapevine-specific version namespace.
Grapevine(u64),
}
impl DbVersion {
/// Return whether this version is part of the compatible version history
/// for grapevine.
///
/// Version changes that were made to conduit after the fork, or made in
/// other conduit forks are not compatible.
pub(crate) fn is_compatible(self) -> bool {
match self {
DbVersion::Conduit(version) => {
// Conduit db version 13 is the last supported version in the
// shared namespace. Our schema diverges past
// this point.
version <= 13
}
DbVersion::Grapevine(_) => true,
}
}
}
impl PartialOrd for DbVersion {
fn partial_cmp(&self, other: &DbVersion) -> Option<Ordering> {
use DbVersion::{Conduit, Grapevine};
match (self, other) {
(Conduit(a), Conduit(b)) | (Grapevine(a), Grapevine(b)) => {
Some(a.cmp(b))
}
(&Conduit(_), Grapevine(_)) => {
self.is_compatible().then_some(Ordering::Less)
}
(Grapevine(_), &Conduit(_)) => {
other.is_compatible().then_some(Ordering::Greater)
}
}
}
}
/// Similar to [`ServerSigningKeys`], but drops a few unnecessary fields we
/// don't require post-validation
#[derive(Deserialize, Debug, Clone)]
@ -117,6 +182,7 @@ pub(crate) trait Data: Send + Sync {
&self,
origin: &ServerName,
) -> Result<Option<SigningKeys>>;
fn database_version(&self) -> Result<u64>;
fn bump_database_version(&self, new_version: u64) -> Result<()>;
fn database_version(&self) -> Result<DbVersion>;
fn bump_database_version(&self, new_version: DbVersion) -> Result<()>;
}

View file

@ -6,6 +6,8 @@ use std::{
borrow::Cow,
cmp, fmt,
fmt::Write,
io,
path::{Component, Path, PathBuf},
str::FromStr,
time::{SystemTime, UNIX_EPOCH},
};
@ -18,6 +20,7 @@ use ruma::{
api::client::error::ErrorKind, canonical_json::try_from_json_map,
CanonicalJsonError, CanonicalJsonObject, MxcUri, MxcUriError, OwnedMxcUri,
};
use tokio::fs;
use crate::{Error, Result};
@ -379,9 +382,165 @@ pub(crate) fn u8_slice_to_hex(slice: &[u8]) -> String {
})
}
/// Canonicalize a path where some components may not exist yet.
///
/// It's assumed that non-existent components will be created as
/// directories. This should match the result of [`fs::canonicalize`]
/// _after_ calling [`fs::create_dir_all`] on `path`.
async fn partial_canonicalize(path: &Path) -> io::Result<PathBuf> {
let mut ret = std::env::current_dir()?;
let mut base_path = Cow::Borrowed(path);
let mut components = base_path.components();
while let Some(component) = components.next() {
match component {
Component::Prefix(_) | Component::RootDir => {
let component_path: &Path = component.as_ref();
component_path.clone_into(&mut ret);
}
Component::CurDir => (),
Component::ParentDir => {
ret.pop();
}
Component::Normal(p) => {
let component_path = ret.join(p);
match fs::symlink_metadata(&component_path).await {
// path is a symlink
Ok(metadata) if metadata.is_symlink() => {
let destination =
fs::read_link(&component_path).await?;
// iterate over the symlink destination components
// before continuing with the original path
base_path =
Cow::Owned(destination.join(components.as_path()));
components = base_path.components();
}
// path exists, not a symlink
Ok(_) => {
ret.push(p);
}
// path does not exist
Err(error) if error.kind() == io::ErrorKind::NotFound => {
// assume a directory will be created here
ret.push(p);
}
Err(error) => return Err(error),
}
}
}
}
Ok(ret)
}
/// Recursively copy a directory from `root_in` to `root_out`.
///
/// This function is not protected against symlink-swapping attacks. Do not use
/// it when any subdirectories or parents of `root_in` or `root_out` may be
/// writable by an untrusted user.
///
/// If `root_in` and `root_out` are the same path, an error is returned.
///
/// If a directory or file already exists at `root_out`, an error is returned.
///
/// If the parent directories of the `root_out` path do not exist, they will be
/// created.
///
/// If an error occurs, the copy will be interrupted, and the output directory
/// may be left in an intermediate state. If this is undesirable, the caller
/// should delete the output directory on an error.
///
/// If the `root_in` directory contains a symlink, aborts the copy and returns
/// an [`crate::error::CopyDir::Symlink`].
pub(crate) async fn copy_dir(
root_in: &Path,
root_out: &Path,
) -> Result<(), crate::error::CopyDir> {
use crate::error::CopyDir as Error;
let root_in =
fs::canonicalize(root_in).await.map_err(Error::CanonicalizeIn)?;
let root_out =
partial_canonicalize(root_out).await.map_err(Error::CanonicalizeOut)?;
if root_in.starts_with(&root_out) || root_out.starts_with(&root_in) {
return Err(Error::Overlap);
}
if fs::try_exists(&root_out).await.map_err(Error::CheckExists)? {
return Err(Error::AlreadyExists);
}
if let Some(parent) = root_out.parent() {
fs::create_dir_all(parent)
.await
.map_err(|e| Error::CreateDir(parent.to_owned(), e))?;
}
// Call 'create_dir' separately for the last dir so that we get an error if
// it already exists. 'try_exists' doesn't fully check for this case
// because TOCTOU.
fs::create_dir(&root_out)
.await
.map_err(|e| Error::CreateDir(root_out.clone(), e))?;
let mut todo = vec![PathBuf::from(".")];
while let Some(path) = todo.pop() {
let dir_in = root_in.join(&path);
let dir_out = root_out.join(&path);
let mut entries = fs::read_dir(&dir_in)
.await
.map_err(|e| Error::ReadDir(dir_in.clone(), e))?;
while let Some(entry) = entries
.next_entry()
.await
.map_err(|e| Error::ReadDir(dir_in.clone(), e))?
{
let entry_in = dir_in.join(entry.file_name());
let entry_out = dir_out.join(entry.file_name());
let file_type = entry
.file_type()
.await
.map_err(|e| Error::Metadata(entry_in.clone(), e))?;
if file_type.is_dir() {
fs::create_dir(&entry_out)
.await
.map_err(|e| Error::CreateDir(entry_out.clone(), e))?;
todo.push(path.join(entry.file_name()));
} else if file_type.is_symlink() {
return Err(Error::Symlink(entry_in));
} else {
fs::copy(&entry_in, &entry_out).await.map_err(|error| {
Error::CopyFile {
from: entry_in.clone(),
to: entry_out.clone(),
error,
}
})?;
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use crate::utils::dbg_truncate_str;
use std::{
collections::HashMap,
io,
path::{Path, PathBuf},
};
use tempfile::TempDir;
use tokio::fs;
use crate::{
error,
utils::{copy_dir, dbg_truncate_str, partial_canonicalize},
};
#[test]
fn test_truncate_str() {
@ -395,4 +554,148 @@ mod tests {
assert_eq!(dbg_truncate_str(ok_hand, ok_hand.len() - 1), "👌🏽");
assert_eq!(dbg_truncate_str(ok_hand, ok_hand.len()), "👌🏽");
}
#[tokio::test]
async fn test_partial_canonicalize() {
let tmp_dir =
TempDir::with_prefix("test_partial_canonicalize").unwrap();
let path = tmp_dir.path();
fs::create_dir(&path.join("dir")).await.unwrap();
fs::symlink(path.join("dir"), path.join("absolute-link-to-dir"))
.await
.unwrap();
fs::symlink("./dir", path.join("relative-link-to-dir")).await.unwrap();
assert_eq!(partial_canonicalize(path).await.unwrap(), path);
assert_eq!(partial_canonicalize(&path.join("./")).await.unwrap(), path);
assert_eq!(
partial_canonicalize(&path.join("dir/..")).await.unwrap(),
path
);
assert_eq!(
partial_canonicalize(&path.join("absolute-link-to-dir"))
.await
.unwrap(),
path.join("dir")
);
assert_eq!(
partial_canonicalize(&path.join("relative-link-to-dir"))
.await
.unwrap(),
path.join("dir")
);
assert_eq!(
partial_canonicalize(&path.join("absolute-link-to-dir/new-dir"))
.await
.unwrap(),
path.join("dir/new-dir")
);
assert_eq!(
partial_canonicalize(
&path.join("absolute-link-to-dir/new-dir/../..")
)
.await
.unwrap(),
path,
);
tmp_dir.close().unwrap();
}
#[derive(Clone, Debug, Eq, PartialEq)]
enum PathContents {
Dir,
Symlink(PathBuf),
File(Vec<u8>),
}
async fn dir_contents(
root: &Path,
) -> io::Result<HashMap<PathBuf, PathContents>> {
let mut ret = HashMap::new();
let mut todo = vec![root.to_owned()];
while let Some(path) = todo.pop() {
let metadata = fs::symlink_metadata(&path).await?;
let contents = if metadata.is_file() {
PathContents::File(fs::read(&path).await?)
} else if metadata.is_dir() {
let mut entries = fs::read_dir(&path).await?;
while let Some(entry) = entries.next_entry().await? {
todo.push(entry.path());
}
PathContents::Dir
} else if metadata.is_symlink() {
PathContents::Symlink(fs::read_link(&path).await?)
} else {
continue;
};
ret.insert(path.strip_prefix(root).unwrap().to_owned(), contents);
}
Ok(ret)
}
#[tokio::test]
async fn test_copy_dir_simple() {
let tmp_dir = TempDir::with_prefix("test_copy_dir_simple").unwrap();
let path = tmp_dir.path();
fs::create_dir(&path.join("src")).await.unwrap();
fs::create_dir(&path.join("src/subdir")).await.unwrap();
fs::create_dir(&path.join("src/empty-subdir")).await.unwrap();
fs::write(&path.join("src/a.txt"), b"foo").await.unwrap();
fs::write(&path.join("src/subdir/b.txt"), b"bar").await.unwrap();
copy_dir(&path.join("src"), &path.join("dst")).await.unwrap();
let src_contents = dir_contents(&path.join("src")).await.unwrap();
let dst_contents = dir_contents(&path.join("dst")).await.unwrap();
assert_eq!(src_contents, dst_contents);
tmp_dir.close().unwrap();
}
#[tokio::test]
async fn test_copy_dir_overlap_error() {
let tmp_dir =
TempDir::with_prefix("test_copy_dir_overlap_error").unwrap();
let path = tmp_dir.path();
fs::create_dir(&path.join("src")).await.unwrap();
assert!(matches!(
copy_dir(&path.join("src"), &path.join("src/dst")).await,
Err(error::CopyDir::Overlap)
));
assert!(matches!(
copy_dir(&path.join("src"), &path.join("src")).await,
Err(error::CopyDir::Overlap)
));
assert!(matches!(
copy_dir(&path.join("src"), path).await,
Err(error::CopyDir::Overlap)
));
tmp_dir.close().unwrap();
}
#[tokio::test]
async fn test_copy_dir_symlink_error() {
let tmp_dir =
TempDir::with_prefix("test_copy_dir_overlap_error").unwrap();
let path = tmp_dir.path();
fs::create_dir(&path.join("src")).await.unwrap();
fs::symlink("./link-target", &path.join("src/link")).await.unwrap();
assert!(matches!(
copy_dir(&path.join("src"), &path.join("dst")).await,
Err(error::CopyDir::Symlink(p)) if p == path.join("src/link")
));
tmp_dir.close().unwrap();
}
}

View file

@ -84,6 +84,9 @@ pub(crate) enum Error {
UnsupportedRoomVersion(ruma::RoomVersionId),
#[error("{0} in {1}")]
InconsistentRoomState(&'static str, ruma::OwnedRoomId),
#[error("unknown db version namespace {}", String::from_utf8_lossy(_0))]
UnknownDbVersionNamespace(Vec<u8>),
}
impl Error {