From e294543ddb44e536f80f2d19bb9de2556792b126 Mon Sep 17 00:00:00 2001 From: Lambda Date: Wed, 22 May 2024 18:28:26 +0000 Subject: [PATCH] sending.rs: add RequestKey Much easier to reason about than with a bunch of Vec everywhere. --- src/database/key_value/sending.rs | 39 +++++++++++++++++++------------ src/service/sending.rs | 21 +++++++++++++---- src/service/sending/data.rs | 17 +++++++------- 3 files changed, 50 insertions(+), 27 deletions(-) diff --git a/src/database/key_value/sending.rs b/src/database/key_value/sending.rs index 29ba6fd7..a03b2596 100644 --- a/src/database/key_value/sending.rs +++ b/src/database/key_value/sending.rs @@ -4,7 +4,7 @@ use crate::{ database::KeyValueDatabase, service::{ self, - sending::{OutgoingKind, SendingEventType}, + sending::{OutgoingKind, RequestKey, SendingEventType}, }, services, utils, Error, Result, }; @@ -13,10 +13,12 @@ impl service::sending::Data for KeyValueDatabase { fn active_requests<'a>( &'a self, ) -> Box< - dyn Iterator, OutgoingKind, SendingEventType)>> - + 'a, + dyn Iterator< + Item = Result<(RequestKey, OutgoingKind, SendingEventType)>, + > + 'a, > { Box::new(self.servercurrentevent_data.iter().map(|(key, v)| { + let key = RequestKey::new(key); parse_servercurrentevent(&key, v).map(|(k, e)| (key, k, e)) })) } @@ -24,16 +26,19 @@ impl service::sending::Data for KeyValueDatabase { fn active_requests_for<'a>( &'a self, outgoing_kind: &OutgoingKind, - ) -> Box, SendingEventType)>> + 'a> + ) -> Box> + 'a> { let prefix = outgoing_kind.get_prefix(); Box::new(self.servercurrentevent_data.scan_prefix(prefix).map( - |(key, v)| parse_servercurrentevent(&key, v).map(|(_, e)| (key, e)), + |(key, v)| { + let key = RequestKey::new(key); + parse_servercurrentevent(&key, v).map(|(_, e)| (key, e)) + }, )) } - fn delete_active_request(&self, key: Vec) -> Result<()> { - self.servercurrentevent_data.remove(&key) + fn delete_active_request(&self, key: RequestKey) -> Result<()> { + self.servercurrentevent_data.remove(key.as_bytes()) } fn delete_all_active_requests_for( @@ -51,7 +56,7 @@ impl service::sending::Data for KeyValueDatabase { fn queue_requests( &self, requests: &[(&OutgoingKind, SendingEventType)], - ) -> Result>> { + ) -> Result> { let mut batch = Vec::new(); let mut keys = Vec::new(); for (outgoing_kind, event) in requests { @@ -69,7 +74,7 @@ impl service::sending::Data for KeyValueDatabase { &[] }; batch.push((key.clone(), value.to_owned())); - keys.push(key); + keys.push(RequestKey::new(key)); } self.servernameevent_data.insert_batch(&mut batch.into_iter())?; Ok(keys) @@ -78,17 +83,20 @@ impl service::sending::Data for KeyValueDatabase { fn queued_requests<'a>( &'a self, outgoing_kind: &OutgoingKind, - ) -> Box)>> + 'a> + ) -> Box> + 'a> { let prefix = outgoing_kind.get_prefix(); return Box::new(self.servernameevent_data.scan_prefix(prefix).map( - |(k, v)| parse_servercurrentevent(&k, v).map(|(_, ev)| (ev, k)), + |(k, v)| { + let k = RequestKey::new(k); + parse_servercurrentevent(&k, v).map(|(_, ev)| (ev, k)) + }, )); } fn mark_as_active( &self, - events: &[(SendingEventType, Vec)], + events: &[(SendingEventType, RequestKey)], ) -> Result<()> { for (e, key) in events { let value = if let SendingEventType::Edu(value) = &e { @@ -96,8 +104,8 @@ impl service::sending::Data for KeyValueDatabase { } else { &[] }; - self.servercurrentevent_data.insert(key, value)?; - self.servernameevent_data.remove(key)?; + self.servercurrentevent_data.insert(key.as_bytes(), value)?; + self.servernameevent_data.remove(key.as_bytes())?; } Ok(()) @@ -126,9 +134,10 @@ impl service::sending::Data for KeyValueDatabase { #[tracing::instrument(skip(key))] fn parse_servercurrentevent( - key: &[u8], + key: &RequestKey, value: Vec, ) -> Result<(OutgoingKind, SendingEventType)> { + let key = key.as_bytes(); // Appservices start with a plus Ok::<_, Error>(if key.starts_with(b"+") { let mut parts = key[1..].splitn(2, |&b| b == 0xFF); diff --git a/src/service/sending.rs b/src/service/sending.rs index c0fd0fd9..7f994e3a 100644 --- a/src/service/sending.rs +++ b/src/service/sending.rs @@ -88,15 +88,28 @@ pub(crate) enum SendingEventType { Edu(Vec), } +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub(crate) struct RequestKey(Vec); + +impl RequestKey { + pub(crate) fn new(key: Vec) -> Self { + Self(key) + } + + pub(crate) fn as_bytes(&self) -> &[u8] { + &self.0 + } +} + pub(crate) struct Service { db: &'static dyn Data, /// The state for a given state hash. pub(super) maximum_requests: Arc, pub(crate) sender: - mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec)>, + mpsc::UnboundedSender<(OutgoingKind, SendingEventType, RequestKey)>, receiver: Mutex< - mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec)>, + mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, RequestKey)>, >, } @@ -284,7 +297,7 @@ impl Service { &self, outgoing_kind: OutgoingKind, event: SendingEventType, - key: Vec, + key: RequestKey, current_transaction_status: &mut TransactionStatusMap, ) -> Option { if let Ok(Some(events)) = self.select_events( @@ -312,7 +325,7 @@ impl Service { &self, outgoing_kind: &OutgoingKind, // Events we want to send: event and full key - new_events: Vec<(SendingEventType, Vec)>, + new_events: Vec<(SendingEventType, RequestKey)>, current_transaction_status: &mut HashMap< OutgoingKind, TransactionStatus, diff --git a/src/service/sending/data.rs b/src/service/sending/data.rs index 040f9715..ad9915e5 100644 --- a/src/service/sending/data.rs +++ b/src/service/sending/data.rs @@ -1,6 +1,6 @@ use ruma::ServerName; -use super::{OutgoingKind, SendingEventType}; +use super::{OutgoingKind, RequestKey, SendingEventType}; use crate::Result; pub(crate) trait Data: Send + Sync { @@ -8,14 +8,15 @@ pub(crate) trait Data: Send + Sync { fn active_requests<'a>( &'a self, ) -> Box< - dyn Iterator, OutgoingKind, SendingEventType)>> - + 'a, + dyn Iterator< + Item = Result<(RequestKey, OutgoingKind, SendingEventType)>, + > + 'a, >; fn active_requests_for<'a>( &'a self, outgoing_kind: &OutgoingKind, - ) -> Box, SendingEventType)>> + 'a>; - fn delete_active_request(&self, key: Vec) -> Result<()>; + ) -> Box> + 'a>; + fn delete_active_request(&self, key: RequestKey) -> Result<()>; fn delete_all_active_requests_for( &self, outgoing_kind: &OutgoingKind, @@ -23,14 +24,14 @@ pub(crate) trait Data: Send + Sync { fn queue_requests( &self, requests: &[(&OutgoingKind, SendingEventType)], - ) -> Result>>; + ) -> Result>; fn queued_requests<'a>( &'a self, outgoing_kind: &OutgoingKind, - ) -> Box)>> + 'a>; + ) -> Box> + 'a>; fn mark_as_active( &self, - events: &[(SendingEventType, Vec)], + events: &[(SendingEventType, RequestKey)], ) -> Result<()>; fn set_latest_educount( &self,