sending.rs: rename OutgoingKind to Destination

That's what it is. It describes the destination of the event.
This commit is contained in:
Lambda 2024-05-22 21:19:55 +00:00
parent 9071e11e06
commit 18992b4d1d
3 changed files with 96 additions and 107 deletions

View file

@ -4,7 +4,7 @@ use crate::{
database::KeyValueDatabase, database::KeyValueDatabase,
service::{ service::{
self, self,
sending::{OutgoingKind, RequestKey, SendingEventType}, sending::{Destination, RequestKey, SendingEventType},
}, },
services, utils, Error, Result, services, utils, Error, Result,
}; };
@ -13,9 +13,8 @@ impl service::sending::Data for KeyValueDatabase {
fn active_requests<'a>( fn active_requests<'a>(
&'a self, &'a self,
) -> Box< ) -> Box<
dyn Iterator< dyn Iterator<Item = Result<(RequestKey, Destination, SendingEventType)>>
Item = Result<(RequestKey, OutgoingKind, SendingEventType)>, + 'a,
> + 'a,
> { > {
Box::new(self.servercurrentevent_data.iter().map(|(key, v)| { Box::new(self.servercurrentevent_data.iter().map(|(key, v)| {
let key = RequestKey::new(key); let key = RequestKey::new(key);
@ -25,10 +24,10 @@ impl service::sending::Data for KeyValueDatabase {
fn active_requests_for<'a>( fn active_requests_for<'a>(
&'a self, &'a self,
outgoing_kind: &OutgoingKind, destination: &Destination,
) -> Box<dyn Iterator<Item = Result<(RequestKey, SendingEventType)>> + 'a> ) -> Box<dyn Iterator<Item = Result<(RequestKey, SendingEventType)>> + 'a>
{ {
let prefix = outgoing_kind.get_prefix(); let prefix = destination.get_prefix();
Box::new(self.servercurrentevent_data.scan_prefix(prefix).map( Box::new(self.servercurrentevent_data.scan_prefix(prefix).map(
|(key, v)| { |(key, v)| {
let key = RequestKey::new(key); let key = RequestKey::new(key);
@ -43,9 +42,9 @@ impl service::sending::Data for KeyValueDatabase {
fn delete_all_active_requests_for( fn delete_all_active_requests_for(
&self, &self,
outgoing_kind: &OutgoingKind, destination: &Destination,
) -> Result<()> { ) -> Result<()> {
let prefix = outgoing_kind.get_prefix(); let prefix = destination.get_prefix();
for (key, _) in self.servercurrentevent_data.scan_prefix(prefix) { for (key, _) in self.servercurrentevent_data.scan_prefix(prefix) {
self.servercurrentevent_data.remove(&key)?; self.servercurrentevent_data.remove(&key)?;
} }
@ -55,12 +54,12 @@ impl service::sending::Data for KeyValueDatabase {
fn queue_requests( fn queue_requests(
&self, &self,
requests: &[(&OutgoingKind, SendingEventType)], requests: &[(&Destination, SendingEventType)],
) -> Result<Vec<RequestKey>> { ) -> Result<Vec<RequestKey>> {
let mut batch = Vec::new(); let mut batch = Vec::new();
let mut keys = Vec::new(); let mut keys = Vec::new();
for (outgoing_kind, event) in requests { for (destination, event) in requests {
let mut key = outgoing_kind.get_prefix(); let mut key = destination.get_prefix();
if let SendingEventType::Pdu(value) = &event { if let SendingEventType::Pdu(value) = &event {
key.extend_from_slice(value); key.extend_from_slice(value);
} else { } else {
@ -82,10 +81,10 @@ impl service::sending::Data for KeyValueDatabase {
fn queued_requests<'a>( fn queued_requests<'a>(
&'a self, &'a self,
outgoing_kind: &OutgoingKind, destination: &Destination,
) -> Box<dyn Iterator<Item = Result<(SendingEventType, RequestKey)>> + 'a> ) -> Box<dyn Iterator<Item = Result<(SendingEventType, RequestKey)>> + 'a>
{ {
let prefix = outgoing_kind.get_prefix(); let prefix = destination.get_prefix();
return Box::new(self.servernameevent_data.scan_prefix(prefix).map( return Box::new(self.servernameevent_data.scan_prefix(prefix).map(
|(k, v)| { |(k, v)| {
let k = RequestKey::new(k); let k = RequestKey::new(k);
@ -136,7 +135,7 @@ impl service::sending::Data for KeyValueDatabase {
fn parse_servercurrentevent( fn parse_servercurrentevent(
key: &RequestKey, key: &RequestKey,
value: Vec<u8>, value: Vec<u8>,
) -> Result<(OutgoingKind, SendingEventType)> { ) -> Result<(Destination, SendingEventType)> {
let key = key.as_bytes(); let key = key.as_bytes();
// Appservices start with a plus // Appservices start with a plus
Ok::<_, Error>(if key.starts_with(b"+") { Ok::<_, Error>(if key.starts_with(b"+") {
@ -154,7 +153,7 @@ fn parse_servercurrentevent(
})?; })?;
( (
OutgoingKind::Appservice(server), Destination::Appservice(server),
if value.is_empty() { if value.is_empty() {
SendingEventType::Pdu(event.to_vec()) SendingEventType::Pdu(event.to_vec())
} else { } else {
@ -185,7 +184,7 @@ fn parse_servercurrentevent(
})?; })?;
( (
OutgoingKind::Push(user_id, pushkey_string), Destination::Push(user_id, pushkey_string),
if value.is_empty() { if value.is_empty() {
SendingEventType::Pdu(event.to_vec()) SendingEventType::Pdu(event.to_vec())
} else { } else {
@ -208,7 +207,7 @@ fn parse_servercurrentevent(
})?; })?;
( (
OutgoingKind::Normal(ServerName::parse(server).map_err(|_| { Destination::Normal(ServerName::parse(server).map_err(|_| {
Error::bad_database( Error::bad_database(
"Invalid server string in server_currenttransaction", "Invalid server string in server_currenttransaction",
) )

View file

@ -45,30 +45,30 @@ use crate::{
}; };
#[derive(Clone, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub(crate) enum OutgoingKind { pub(crate) enum Destination {
Appservice(String), Appservice(String),
// user and pushkey // user and pushkey
Push(OwnedUserId, String), Push(OwnedUserId, String),
Normal(OwnedServerName), Normal(OwnedServerName),
} }
impl OutgoingKind { impl Destination {
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub(crate) fn get_prefix(&self) -> Vec<u8> { pub(crate) fn get_prefix(&self) -> Vec<u8> {
let mut prefix = match self { let mut prefix = match self {
OutgoingKind::Appservice(server) => { Destination::Appservice(server) => {
let mut p = b"+".to_vec(); let mut p = b"+".to_vec();
p.extend_from_slice(server.as_bytes()); p.extend_from_slice(server.as_bytes());
p p
} }
OutgoingKind::Push(user, pushkey) => { Destination::Push(user, pushkey) => {
let mut p = b"$".to_vec(); let mut p = b"$".to_vec();
p.extend_from_slice(user.as_bytes()); p.extend_from_slice(user.as_bytes());
p.push(0xFF); p.push(0xFF);
p.extend_from_slice(pushkey.as_bytes()); p.extend_from_slice(pushkey.as_bytes());
p p
} }
OutgoingKind::Normal(server) => { Destination::Normal(server) => {
let mut p = Vec::new(); let mut p = Vec::new();
p.extend_from_slice(server.as_bytes()); p.extend_from_slice(server.as_bytes());
p p
@ -102,7 +102,7 @@ impl RequestKey {
} }
pub(crate) struct RequestData { pub(crate) struct RequestData {
outgoing_kind: OutgoingKind, destination: Destination,
event_type: SendingEventType, event_type: SendingEventType,
key: RequestKey, key: RequestKey,
} }
@ -126,18 +126,18 @@ enum TransactionStatus {
} }
struct HandlerInputs { struct HandlerInputs {
kind: OutgoingKind, destination: Destination,
events: Vec<SendingEventType>, events: Vec<SendingEventType>,
} }
type HandlerResponse = Result<OutgoingKind, (OutgoingKind, Error)>; type HandlerResponse = Result<Destination, (Destination, Error)>;
fn outgoing_kind_from_response(response: &HandlerResponse) -> &OutgoingKind { fn destination_from_response(response: &HandlerResponse) -> &Destination {
match response { match response {
Ok(kind) | Err((kind, _)) => kind, Ok(kind) | Err((kind, _)) => kind,
} }
} }
type TransactionStatusMap = HashMap<OutgoingKind, TransactionStatus>; type TransactionStatusMap = HashMap<Destination, TransactionStatus>;
impl Service { impl Service {
pub(crate) fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> { pub(crate) fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
@ -168,18 +168,18 @@ impl Service {
// Retry requests we could not finish yet // Retry requests we could not finish yet
let mut initial_transactions = let mut initial_transactions =
HashMap::<OutgoingKind, Vec<SendingEventType>>::new(); HashMap::<Destination, Vec<SendingEventType>>::new();
for (key, outgoing_kind, event) in for (key, destination, event) in
self.db.active_requests().filter_map(Result::ok) self.db.active_requests().filter_map(Result::ok)
{ {
let entry = let entry =
initial_transactions.entry(outgoing_kind.clone()).or_default(); initial_transactions.entry(destination.clone()).or_default();
if entry.len() > 30 { if entry.len() > 30 {
warn!( warn!(
"Dropping some current events: {:?} {:?} {:?}", "Dropping some current events: {:?} {:?} {:?}",
key, outgoing_kind, event key, destination, event
); );
self.db.delete_active_request(key)?; self.db.delete_active_request(key)?;
continue; continue;
@ -188,33 +188,33 @@ impl Service {
entry.push(event); entry.push(event);
} }
for (outgoing_kind, events) in initial_transactions { for (destination, events) in initial_transactions {
current_transaction_status current_transaction_status
.insert(outgoing_kind.clone(), TransactionStatus::Running); .insert(destination.clone(), TransactionStatus::Running);
futures.push(handle_events(outgoing_kind.clone(), events)); futures.push(handle_events(destination.clone(), events));
} }
loop { loop {
select! { select! {
Some(response) = futures.next() => { Some(response) = futures.next() => {
if let Some(HandlerInputs { if let Some(HandlerInputs {
kind, destination,
events, events,
}) = self.handle_response( }) = self.handle_response(
response, response,
&mut current_transaction_status, &mut current_transaction_status,
)? { )? {
futures.push(handle_events(kind, events)); futures.push(handle_events(destination, events));
} }
} }
Some(data) = receiver.recv() => { Some(data) = receiver.recv() => {
if let Some(HandlerInputs { if let Some(HandlerInputs {
kind, destination,
events, events,
}) = self }) = self
.handle_receiver(data, &mut current_transaction_status) .handle_receiver(data, &mut current_transaction_status)
{ {
futures.push(handle_events(kind, events)); futures.push(handle_events(destination, events));
} }
} }
} }
@ -225,7 +225,7 @@ impl Service {
skip(self, current_transaction_status), skip(self, current_transaction_status),
fields( fields(
current_status = ?current_transaction_status.get( current_status = ?current_transaction_status.get(
outgoing_kind_from_response(&response) destination_from_response(&response)
), ),
), ),
)] )]
@ -235,27 +235,27 @@ impl Service {
current_transaction_status: &mut TransactionStatusMap, current_transaction_status: &mut TransactionStatusMap,
) -> Result<Option<HandlerInputs>> { ) -> Result<Option<HandlerInputs>> {
match response { match response {
Ok(outgoing_kind) => { Ok(destination) => {
self.db.delete_all_active_requests_for(&outgoing_kind)?; self.db.delete_all_active_requests_for(&destination)?;
// Find events that have been added since starting the // Find events that have been added since starting the
// last request // last request
let new_events = self let new_events = self
.db .db
.queued_requests(&outgoing_kind) .queued_requests(&destination)
.filter_map(Result::ok) .filter_map(Result::ok)
.take(30) .take(30)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
if new_events.is_empty() { if new_events.is_empty() {
current_transaction_status.remove(&outgoing_kind); current_transaction_status.remove(&destination);
Ok(None) Ok(None)
} else { } else {
// Insert pdus we found // Insert pdus we found
self.db.mark_as_active(&new_events)?; self.db.mark_as_active(&new_events)?;
Ok(Some(HandlerInputs { Ok(Some(HandlerInputs {
kind: outgoing_kind.clone(), destination: destination.clone(),
events: new_events events: new_events
.into_iter() .into_iter()
.map(|(event, _)| event) .map(|(event, _)| event)
@ -263,29 +263,23 @@ impl Service {
})) }))
} }
} }
Err((outgoing_kind, _)) => { Err((destination, _)) => {
current_transaction_status.entry(outgoing_kind).and_modify( current_transaction_status.entry(destination).and_modify(|e| {
|e| { *e = match e {
*e = match e { TransactionStatus::Running => {
TransactionStatus::Running => { TransactionStatus::Failed(1, Instant::now())
TransactionStatus::Failed(1, Instant::now())
}
TransactionStatus::Retrying(n) => {
TransactionStatus::Failed(
*n + 1,
Instant::now(),
)
}
TransactionStatus::Failed(..) => {
error!(
"Request that was not even running \
failed?!"
);
return;
}
} }
}, TransactionStatus::Retrying(n) => {
); TransactionStatus::Failed(*n + 1, Instant::now())
}
TransactionStatus::Failed(..) => {
error!(
"Request that was not even running failed?!"
);
return;
}
}
});
Ok(None) Ok(None)
} }
} }
@ -294,25 +288,25 @@ impl Service {
#[tracing::instrument( #[tracing::instrument(
skip(self, event_type, key, current_transaction_status), skip(self, event_type, key, current_transaction_status),
fields( fields(
current_status = ?current_transaction_status.get(&outgoing_kind), current_status = ?current_transaction_status.get(&destination),
), ),
)] )]
fn handle_receiver( fn handle_receiver(
&self, &self,
RequestData { RequestData {
outgoing_kind, destination,
event_type, event_type,
key, key,
}: RequestData, }: RequestData,
current_transaction_status: &mut TransactionStatusMap, current_transaction_status: &mut TransactionStatusMap,
) -> Option<HandlerInputs> { ) -> Option<HandlerInputs> {
if let Ok(Some(events)) = self.select_events( if let Ok(Some(events)) = self.select_events(
&outgoing_kind, &destination,
vec![(event_type, key)], vec![(event_type, key)],
current_transaction_status, current_transaction_status,
) { ) {
Some(HandlerInputs { Some(HandlerInputs {
kind: outgoing_kind, destination,
events, events,
}) })
} else { } else {
@ -324,23 +318,23 @@ impl Service {
skip(self, new_events, current_transaction_status), skip(self, new_events, current_transaction_status),
fields( fields(
new_events = debug_slice_truncated(&new_events, 3), new_events = debug_slice_truncated(&new_events, 3),
current_status = ?current_transaction_status.get(outgoing_kind), current_status = ?current_transaction_status.get(destination),
), ),
)] )]
fn select_events( fn select_events(
&self, &self,
outgoing_kind: &OutgoingKind, destination: &Destination,
// Events we want to send: event and full key // Events we want to send: event and full key
new_events: Vec<(SendingEventType, RequestKey)>, new_events: Vec<(SendingEventType, RequestKey)>,
current_transaction_status: &mut HashMap< current_transaction_status: &mut HashMap<
OutgoingKind, Destination,
TransactionStatus, TransactionStatus,
>, >,
) -> Result<Option<Vec<SendingEventType>>> { ) -> Result<Option<Vec<SendingEventType>>> {
let mut retry = false; let mut retry = false;
let mut allow = true; let mut allow = true;
let entry = current_transaction_status.entry(outgoing_kind.clone()); let entry = current_transaction_status.entry(destination.clone());
entry entry
.and_modify(|e| match e { .and_modify(|e| match e {
@ -377,10 +371,8 @@ impl Service {
if retry { if retry {
// We retry the previous transaction // We retry the previous transaction
for (_, e) in self for (_, e) in
.db self.db.active_requests_for(destination).filter_map(Result::ok)
.active_requests_for(outgoing_kind)
.filter_map(Result::ok)
{ {
events.push(e); events.push(e);
} }
@ -390,7 +382,7 @@ impl Service {
events.push(e); events.push(e);
} }
if let OutgoingKind::Normal(server_name) = outgoing_kind { if let Destination::Normal(server_name) = destination {
if let Ok((select_edus, last_count)) = if let Ok((select_edus, last_count)) =
self.select_edus(server_name) self.select_edus(server_name)
{ {
@ -536,13 +528,13 @@ impl Service {
user: &UserId, user: &UserId,
pushkey: String, pushkey: String,
) -> Result<()> { ) -> Result<()> {
let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey); let destination = Destination::Push(user.to_owned(), pushkey);
let event_type = SendingEventType::Pdu(pdu_id.to_owned()); let event_type = SendingEventType::Pdu(pdu_id.to_owned());
let keys = let keys =
self.db.queue_requests(&[(&outgoing_kind, event_type.clone())])?; self.db.queue_requests(&[(&destination, event_type.clone())])?;
self.sender self.sender
.send(RequestData { .send(RequestData {
outgoing_kind, destination,
event_type, event_type,
key: keys.into_iter().next().unwrap(), key: keys.into_iter().next().unwrap(),
}) })
@ -561,7 +553,7 @@ impl Service {
.into_iter() .into_iter()
.map(|server| { .map(|server| {
( (
OutgoingKind::Normal(server), Destination::Normal(server),
SendingEventType::Pdu(pdu_id.to_owned()), SendingEventType::Pdu(pdu_id.to_owned()),
) )
}) })
@ -569,11 +561,10 @@ impl Service {
let keys = self.db.queue_requests( let keys = self.db.queue_requests(
&requests.iter().map(|(o, e)| (o, e.clone())).collect::<Vec<_>>(), &requests.iter().map(|(o, e)| (o, e.clone())).collect::<Vec<_>>(),
)?; )?;
for ((outgoing_kind, event_type), key) in requests.into_iter().zip(keys) for ((destination, event_type), key) in requests.into_iter().zip(keys) {
{
self.sender self.sender
.send(RequestData { .send(RequestData {
outgoing_kind: outgoing_kind.clone(), destination: destination.clone(),
event_type, event_type,
key, key,
}) })
@ -590,13 +581,13 @@ impl Service {
serialized: Vec<u8>, serialized: Vec<u8>,
id: u64, id: u64,
) -> Result<()> { ) -> Result<()> {
let outgoing_kind = OutgoingKind::Normal(server.to_owned()); let destination = Destination::Normal(server.to_owned());
let event_type = SendingEventType::Edu(serialized); let event_type = SendingEventType::Edu(serialized);
let keys = let keys =
self.db.queue_requests(&[(&outgoing_kind, event_type.clone())])?; self.db.queue_requests(&[(&destination, event_type.clone())])?;
self.sender self.sender
.send(RequestData { .send(RequestData {
outgoing_kind, destination,
event_type, event_type,
key: keys.into_iter().next().unwrap(), key: keys.into_iter().next().unwrap(),
}) })
@ -611,13 +602,13 @@ impl Service {
appservice_id: String, appservice_id: String,
pdu_id: Vec<u8>, pdu_id: Vec<u8>,
) -> Result<()> { ) -> Result<()> {
let outgoing_kind = OutgoingKind::Appservice(appservice_id); let destination = Destination::Appservice(appservice_id);
let event_type = SendingEventType::Pdu(pdu_id); let event_type = SendingEventType::Pdu(pdu_id);
let keys = let keys =
self.db.queue_requests(&[(&outgoing_kind, event_type.clone())])?; self.db.queue_requests(&[(&destination, event_type.clone())])?;
self.sender self.sender
.send(RequestData { .send(RequestData {
outgoing_kind, destination,
event_type, event_type,
key: keys.into_iter().next().unwrap(), key: keys.into_iter().next().unwrap(),
}) })
@ -892,23 +883,23 @@ async fn handle_federation_event(
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn handle_events( async fn handle_events(
kind: OutgoingKind, destination: Destination,
events: Vec<SendingEventType>, events: Vec<SendingEventType>,
) -> HandlerResponse { ) -> HandlerResponse {
let ret = match &kind { let ret = match &destination {
OutgoingKind::Appservice(id) => { Destination::Appservice(id) => {
handle_appservice_event(id, events).await handle_appservice_event(id, events).await
} }
OutgoingKind::Push(userid, pushkey) => { Destination::Push(userid, pushkey) => {
handle_push_event(userid, pushkey, events).await handle_push_event(userid, pushkey, events).await
} }
OutgoingKind::Normal(server) => { Destination::Normal(server) => {
handle_federation_event(server, events).await handle_federation_event(server, events).await
} }
}; };
match ret { match ret {
Ok(()) => Ok(kind), Ok(()) => Ok(destination),
Err(e) => Err((kind, e)), Err(e) => Err((destination, e)),
} }
} }

View file

@ -1,6 +1,6 @@
use ruma::ServerName; use ruma::ServerName;
use super::{OutgoingKind, RequestKey, SendingEventType}; use super::{Destination, RequestKey, SendingEventType};
use crate::Result; use crate::Result;
pub(crate) trait Data: Send + Sync { pub(crate) trait Data: Send + Sync {
@ -8,26 +8,25 @@ pub(crate) trait Data: Send + Sync {
fn active_requests<'a>( fn active_requests<'a>(
&'a self, &'a self,
) -> Box< ) -> Box<
dyn Iterator< dyn Iterator<Item = Result<(RequestKey, Destination, SendingEventType)>>
Item = Result<(RequestKey, OutgoingKind, SendingEventType)>, + 'a,
> + 'a,
>; >;
fn active_requests_for<'a>( fn active_requests_for<'a>(
&'a self, &'a self,
outgoing_kind: &OutgoingKind, destination: &Destination,
) -> Box<dyn Iterator<Item = Result<(RequestKey, SendingEventType)>> + 'a>; ) -> Box<dyn Iterator<Item = Result<(RequestKey, SendingEventType)>> + 'a>;
fn delete_active_request(&self, key: RequestKey) -> Result<()>; fn delete_active_request(&self, key: RequestKey) -> Result<()>;
fn delete_all_active_requests_for( fn delete_all_active_requests_for(
&self, &self,
outgoing_kind: &OutgoingKind, destination: &Destination,
) -> Result<()>; ) -> Result<()>;
fn queue_requests( fn queue_requests(
&self, &self,
requests: &[(&OutgoingKind, SendingEventType)], requests: &[(&Destination, SendingEventType)],
) -> Result<Vec<RequestKey>>; ) -> Result<Vec<RequestKey>>;
fn queued_requests<'a>( fn queued_requests<'a>(
&'a self, &'a self,
outgoing_kind: &OutgoingKind, destination: &Destination,
) -> Box<dyn Iterator<Item = Result<(SendingEventType, RequestKey)>> + 'a>; ) -> Box<dyn Iterator<Item = Result<(SendingEventType, RequestKey)>> + 'a>;
fn mark_as_active( fn mark_as_active(
&self, &self,