sending.rs: add RequestData to replace big tuple

This commit is contained in:
Lambda 2024-05-22 18:40:44 +00:00
parent e294543ddb
commit 9961f1465f

View file

@ -101,16 +101,19 @@ impl RequestKey {
} }
} }
pub(crate) struct RequestData {
outgoing_kind: OutgoingKind,
event_type: SendingEventType,
key: RequestKey,
}
pub(crate) struct Service { pub(crate) struct Service {
db: &'static dyn Data, db: &'static dyn Data,
/// The state for a given state hash. /// The state for a given state hash.
pub(super) maximum_requests: Arc<Semaphore>, pub(super) maximum_requests: Arc<Semaphore>,
pub(crate) sender: pub(crate) sender: mpsc::UnboundedSender<RequestData>,
mpsc::UnboundedSender<(OutgoingKind, SendingEventType, RequestKey)>, receiver: Mutex<mpsc::UnboundedReceiver<RequestData>>,
receiver: Mutex<
mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, RequestKey)>,
>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -202,12 +205,10 @@ impl Service {
{ {
futures.push(Self::handle_events(kind, events)); futures.push(Self::handle_events(kind, events));
}, },
Some((outgoing_kind, event, key)) = receiver.recv() => Some(data) = receiver.recv() =>
if let Some(HandlerInputs { kind, events }) = if let Some(HandlerInputs { kind, events }) =
self.handle_receiver( self.handle_receiver(
outgoing_kind, data,
event,
key,
&mut current_transaction_status, &mut current_transaction_status,
) )
{ {
@ -288,21 +289,23 @@ impl Service {
} }
#[tracing::instrument( #[tracing::instrument(
skip(self, event, key, current_transaction_status), skip(self, event_type, key, current_transaction_status),
fields( fields(
current_status = ?current_transaction_status.get(&outgoing_kind), current_status = ?current_transaction_status.get(&outgoing_kind),
), ),
)] )]
fn handle_receiver( fn handle_receiver(
&self, &self,
outgoing_kind: OutgoingKind, RequestData {
event: SendingEventType, outgoing_kind,
key: RequestKey, event_type,
key,
}: RequestData,
current_transaction_status: &mut TransactionStatusMap, current_transaction_status: &mut TransactionStatusMap,
) -> Option<HandlerInputs> { ) -> Option<HandlerInputs> {
if let Ok(Some(events)) = self.select_events( if let Ok(Some(events)) = self.select_events(
&outgoing_kind, &outgoing_kind,
vec![(event, key)], vec![(event_type, key)],
current_transaction_status, current_transaction_status,
) { ) {
Some(HandlerInputs { Some(HandlerInputs {
@ -531,11 +534,15 @@ impl Service {
pushkey: String, pushkey: String,
) -> Result<()> { ) -> Result<()> {
let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey); let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey);
let event = SendingEventType::Pdu(pdu_id.to_owned()); let event_type = SendingEventType::Pdu(pdu_id.to_owned());
let keys = let keys =
self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; self.db.queue_requests(&[(&outgoing_kind, event_type.clone())])?;
self.sender self.sender
.send((outgoing_kind, event, keys.into_iter().next().unwrap())) .send(RequestData {
outgoing_kind,
event_type,
key: keys.into_iter().next().unwrap(),
})
.unwrap(); .unwrap();
Ok(()) Ok(())
@ -559,8 +566,15 @@ impl Service {
let keys = self.db.queue_requests( let keys = self.db.queue_requests(
&requests.iter().map(|(o, e)| (o, e.clone())).collect::<Vec<_>>(), &requests.iter().map(|(o, e)| (o, e.clone())).collect::<Vec<_>>(),
)?; )?;
for ((outgoing_kind, event), key) in requests.into_iter().zip(keys) { for ((outgoing_kind, event_type), key) in requests.into_iter().zip(keys)
self.sender.send((outgoing_kind.clone(), event, key)).unwrap(); {
self.sender
.send(RequestData {
outgoing_kind: outgoing_kind.clone(),
event_type,
key,
})
.unwrap();
} }
Ok(()) Ok(())
@ -574,11 +588,15 @@ impl Service {
id: u64, id: u64,
) -> Result<()> { ) -> Result<()> {
let outgoing_kind = OutgoingKind::Normal(server.to_owned()); let outgoing_kind = OutgoingKind::Normal(server.to_owned());
let event = SendingEventType::Edu(serialized); let event_type = SendingEventType::Edu(serialized);
let keys = let keys =
self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; self.db.queue_requests(&[(&outgoing_kind, event_type.clone())])?;
self.sender self.sender
.send((outgoing_kind, event, keys.into_iter().next().unwrap())) .send(RequestData {
outgoing_kind,
event_type,
key: keys.into_iter().next().unwrap(),
})
.unwrap(); .unwrap();
Ok(()) Ok(())
@ -591,11 +609,15 @@ impl Service {
pdu_id: Vec<u8>, pdu_id: Vec<u8>,
) -> Result<()> { ) -> Result<()> {
let outgoing_kind = OutgoingKind::Appservice(appservice_id); let outgoing_kind = OutgoingKind::Appservice(appservice_id);
let event = SendingEventType::Pdu(pdu_id); let event_type = SendingEventType::Pdu(pdu_id);
let keys = let keys =
self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; self.db.queue_requests(&[(&outgoing_kind, event_type.clone())])?;
self.sender self.sender
.send((outgoing_kind, event, keys.into_iter().next().unwrap())) .send(RequestData {
outgoing_kind,
event_type,
key: keys.into_iter().next().unwrap(),
})
.unwrap(); .unwrap();
Ok(()) Ok(())