mirror of
https://gitlab.computer.surgery/matrix/grapevine.git
synced 2025-12-16 15:21:24 +01:00
add scan_prefix method
This commit is contained in:
parent
029e32971e
commit
132bd3ae3a
2 changed files with 161 additions and 1 deletions
|
|
@ -9,6 +9,7 @@ use std::{
|
|||
};
|
||||
|
||||
use frunk::{HCons, HNil};
|
||||
use futures_util::Stream;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
|
@ -58,6 +59,24 @@ pub(crate) trait Map {
|
|||
where
|
||||
Self::Key: Borrow<K>,
|
||||
K: ToBytes + ?Sized;
|
||||
|
||||
/// Get a stream of all key-value pairs whose key matches a key prefix
|
||||
///
|
||||
/// While it's possible to provide an entire key as the prefix, it's likely
|
||||
/// more ergonomic and more performant to use [`Map::get`] in that case
|
||||
/// instead.
|
||||
#[rustfmt::skip]
|
||||
async fn scan_prefix<P>(
|
||||
&self,
|
||||
key: &P,
|
||||
) -> Result<
|
||||
impl Stream<
|
||||
Item = (Result<Self::Key, MapError>, Result<Self::Value, MapError>)
|
||||
>,
|
||||
MapError,
|
||||
>
|
||||
where
|
||||
P: ToBytes + IsPrefixOf<Self::Key>;
|
||||
}
|
||||
|
||||
/// Convert `Self` into bytes for storage in a key-value store
|
||||
|
|
@ -176,3 +195,15 @@ impl FromBytes for String {
|
|||
String::from_utf8(bytes).map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
/// Ensures, at compile time, that one `HList` is a prefix of another
|
||||
pub(crate) trait IsPrefixOf<HList> {}
|
||||
|
||||
impl<HList> IsPrefixOf<HList> for HNil {}
|
||||
|
||||
impl<Head, PrefixTail, Tail> IsPrefixOf<HCons<Head, Tail>>
|
||||
for HCons<Head, PrefixTail>
|
||||
where
|
||||
PrefixTail: IsPrefixOf<Tail>,
|
||||
{
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,13 +1,48 @@
|
|||
use std::{
|
||||
borrow::Borrow, collections::BTreeMap, marker::PhantomData, sync::RwLock,
|
||||
borrow::Borrow, collections::BTreeMap, marker::PhantomData,
|
||||
mem::ManuallyDrop, sync::RwLock,
|
||||
};
|
||||
|
||||
use frunk::{hlist, HList};
|
||||
use futures_util::{stream, Stream, StreamExt};
|
||||
|
||||
use super::{FromBytes, Map, MapError, ToBytes};
|
||||
|
||||
mod conversions;
|
||||
|
||||
struct Iter<T, I> {
|
||||
inner: ManuallyDrop<I>,
|
||||
guard_ref: *mut T,
|
||||
}
|
||||
|
||||
impl<T, I> Drop for Iter<T, I> {
|
||||
fn drop(&mut self) {
|
||||
// SAFETY: The following things must be true for this to be sound:
|
||||
//
|
||||
// * `inner`'s `Iterator` impl reads into memory held by `guard_ref` so
|
||||
// the former must dropped first
|
||||
// * `Self` must not impl `Clone` or else `guard_ref` could get
|
||||
// double-free'd
|
||||
// * `guard_ref` must be constructed by `Box::leak` for `Box::from_raw`
|
||||
// to work
|
||||
unsafe {
|
||||
ManuallyDrop::drop(&mut self.inner);
|
||||
drop(Box::from_raw(self.guard_ref));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, I> Iterator for Iter<T, I>
|
||||
where
|
||||
I: Iterator,
|
||||
{
|
||||
type Item = I::Item;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.inner.next()
|
||||
}
|
||||
}
|
||||
|
||||
struct TestMap<K, V> {
|
||||
storage: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
|
||||
types: PhantomData<(K, V)>,
|
||||
|
|
@ -73,6 +108,46 @@ where
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[rustfmt::skip]
|
||||
async fn scan_prefix<P>(
|
||||
&self,
|
||||
key: &P,
|
||||
) -> Result<
|
||||
impl Stream<
|
||||
Item = (Result<Self::Key, MapError>, Result<Self::Value, MapError>)
|
||||
>,
|
||||
MapError,
|
||||
>
|
||||
where
|
||||
P: ToBytes,
|
||||
{
|
||||
let guard = self
|
||||
.storage
|
||||
.read()
|
||||
.expect("lock should not be poisoned");
|
||||
|
||||
let guard = Box::leak(Box::new(guard));
|
||||
|
||||
let guard_ref: *mut _ = guard;
|
||||
|
||||
let inner = guard
|
||||
.iter()
|
||||
.filter(|(kb, _)| kb.starts_with(key.borrow().to_bytes().as_ref()))
|
||||
.map(|(kb, vb)| {
|
||||
(
|
||||
Self::Key::from_bytes(kb.to_owned())
|
||||
.map_err(MapError::FromBytes),
|
||||
Self::Value::from_bytes(vb.to_owned())
|
||||
.map_err(MapError::FromBytes),
|
||||
)
|
||||
});
|
||||
|
||||
Ok(stream::iter(Iter {
|
||||
inner: ManuallyDrop::new(inner),
|
||||
guard_ref,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -115,3 +190,57 @@ async fn hlist_to_hlist() {
|
|||
|
||||
assert_eq!(None, actual_value);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn hlist_scan_prefix() {
|
||||
let test_map =
|
||||
TestMap::<HList![String, String], HList![String, String]>::new();
|
||||
|
||||
let key = hlist!["hello".to_owned(), "world".to_owned()];
|
||||
let value = hlist!["test".to_owned(), "suite".to_owned()];
|
||||
test_map.set(&key, &value).await.expect("insertion should succed");
|
||||
|
||||
let key = hlist!["hello".to_owned(), "debugger".to_owned()];
|
||||
let value = hlist!["tester".to_owned(), "suiter".to_owned()];
|
||||
test_map.set(&key, &value).await.expect("insertion should succed");
|
||||
|
||||
let key = hlist!["shouldn't".to_owned(), "appear".to_owned()];
|
||||
let value = hlist!["in".to_owned(), "assertions".to_owned()];
|
||||
test_map.set(&key, &value).await.expect("insertion should succed");
|
||||
|
||||
let prefix = hlist!["hello".to_owned()];
|
||||
let mut stream = test_map
|
||||
.scan_prefix(&prefix)
|
||||
.await
|
||||
.expect("scanning should succeed")
|
||||
.enumerate();
|
||||
while let Some((i, next)) = stream.next().await {
|
||||
let (key, value) = next;
|
||||
let (key, value) = (
|
||||
key.expect("key decoding should succeed"),
|
||||
value.expect("value decoding should succeed"),
|
||||
);
|
||||
|
||||
// Ordering is guaranteed because BTreeMap
|
||||
match i {
|
||||
0 => {
|
||||
assert_eq!(
|
||||
key,
|
||||
hlist!["hello".to_owned(), "debugger".to_owned()]
|
||||
);
|
||||
assert_eq!(
|
||||
value,
|
||||
hlist!["tester".to_owned(), "suiter".to_owned()]
|
||||
);
|
||||
}
|
||||
1 => {
|
||||
assert_eq!(key, hlist!["hello".to_owned(), "world".to_owned()]);
|
||||
assert_eq!(
|
||||
value,
|
||||
hlist!["test".to_owned(), "suite".to_owned()]
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue