sending.rs: factor out event handlers into separate functions

This allows for much nicer control flow, since they don't need to return
Result<OutgoingKind, (OutgoingKind, Error)>.
This commit is contained in:
Lambda 2024-05-22 20:34:47 +00:00
parent 5caafdec06
commit 13f79dfee1

View file

@ -627,280 +627,244 @@ impl Service {
} }
#[tracing::instrument(skip(events))] #[tracing::instrument(skip(events))]
async fn handle_appservice_event(
id: &str,
events: Vec<SendingEventType>,
) -> Result<()> {
let mut pdu_jsons = Vec::new();
for event in &events {
match event {
SendingEventType::Pdu(pdu_id) => {
pdu_jsons.push(
services()
.rooms
.timeline
.get_pdu_from_id(pdu_id)?
.ok_or_else(|| {
Error::bad_database(
"[Appservice] Event in \
servernameevent_data not found in db.",
)
})?
.to_room_event(),
);
}
SendingEventType::Edu(_) => {
// Appservices don't need EDUs (?)
}
}
}
let permit = services().sending.maximum_requests.acquire().await;
appservice_server::send_request(
services().appservice.get_registration(id).await.ok_or_else(
|| {
Error::bad_database(
"[Appservice] Could not load registration from db.",
)
},
)?,
appservice::event::push_events::v1::Request {
events: pdu_jsons,
txn_id: general_purpose::URL_SAFE_NO_PAD
.encode(calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEventType::Edu(b)
| SendingEventType::Pdu(b) => &**b,
})
.collect::<Vec<_>>(),
))
.into(),
},
)
.await?;
drop(permit);
Ok(())
}
#[tracing::instrument(skip(events))]
async fn handle_push_event(
userid: &UserId,
pushkey: &str,
events: Vec<SendingEventType>,
) -> Result<()> {
let mut pdus = Vec::new();
for event in &events {
match event {
SendingEventType::Pdu(pdu_id) => {
pdus.push(
services()
.rooms
.timeline
.get_pdu_from_id(pdu_id)?
.ok_or_else(|| {
Error::bad_database(
"[Push] Event in servernamevent_datas not \
found in db.",
)
})?,
);
}
// Push gateways don't need EDUs (?)
SendingEventType::Edu(_) => {}
}
}
for pdu in pdus {
// Redacted events are not notification targets (we don't
// send push for them)
if let Some(unsigned) = &pdu.unsigned {
if let Ok(unsigned) =
serde_json::from_str::<serde_json::Value>(unsigned.get())
{
if unsigned.get("redacted_because").is_some() {
continue;
}
}
}
let Some(pusher) = services().pusher.get_pusher(userid, pushkey)?
else {
continue;
};
let rules_for_user = services()
.account_data
.get(
None,
userid,
GlobalAccountDataEventType::PushRules.to_string().into(),
)
.unwrap_or_default()
.and_then(|event| {
serde_json::from_str::<PushRulesEvent>(event.get()).ok()
})
.map_or_else(
|| push::Ruleset::server_default(userid),
|ev: PushRulesEvent| ev.content.global,
);
let unread: UInt = services()
.rooms
.user
.notification_count(userid, &pdu.room_id)?
.try_into()
.expect("notification count can't go that high");
let permit = services().sending.maximum_requests.acquire().await;
services()
.pusher
.send_push_notice(userid, unread, &pusher, rules_for_user, &pdu)
.await?;
drop(permit);
}
Ok(())
}
#[tracing::instrument(skip(events))]
async fn handle_federation_event(
server: &ServerName,
events: Vec<SendingEventType>,
) -> Result<()> {
let mut edu_jsons = Vec::new();
let mut pdu_jsons = Vec::new();
for event in &events {
match event {
SendingEventType::Pdu(pdu_id) => {
// TODO: check room version and remove event_id if
// needed
pdu_jsons.push(
PduEvent::convert_to_outgoing_federation_event(
services()
.rooms
.timeline
.get_pdu_json_from_id(pdu_id)?
.ok_or_else(|| {
error!(
"event not found: {server} {pdu_id:?}"
);
Error::bad_database(
"[Normal] Event in \
servernamevent_datas not found in db.",
)
})?,
),
);
}
SendingEventType::Edu(edu) => {
if let Ok(raw) = serde_json::from_slice(edu) {
edu_jsons.push(raw);
}
}
}
}
let permit = services().sending.maximum_requests.acquire().await;
let response = server_server::send_request(
server,
send_transaction_message::v1::Request {
origin: services().globals.server_name().to_owned(),
pdus: pdu_jsons,
edus: edu_jsons,
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
transaction_id: general_purpose::URL_SAFE_NO_PAD
.encode(calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEventType::Edu(b)
| SendingEventType::Pdu(b) => &**b,
})
.collect::<Vec<_>>(),
))
.into(),
},
)
.await?;
for pdu in response.pdus {
if pdu.1.is_err() {
warn!("Failed to send to {}: {:?}", server, pdu);
}
}
drop(permit);
Ok(())
}
#[tracing::instrument(skip_all)]
async fn handle_events( async fn handle_events(
kind: OutgoingKind, kind: OutgoingKind,
events: Vec<SendingEventType>, events: Vec<SendingEventType>,
) -> HandlerResponse { ) -> HandlerResponse {
match &kind { let ret = match &kind {
OutgoingKind::Appservice(id) => { OutgoingKind::Appservice(id) => {
let mut pdu_jsons = Vec::new(); Self::handle_appservice_event(id, events).await
for event in &events {
match event {
SendingEventType::Pdu(pdu_id) => {
pdu_jsons.push(
services()
.rooms
.timeline
.get_pdu_from_id(pdu_id)
.map_err(|e| (kind.clone(), e))?
.ok_or_else(|| {
(
kind.clone(),
Error::bad_database(
"[Appservice] Event in \
servernameevent_data not \
found in db.",
),
)
})?
.to_room_event(),
);
}
SendingEventType::Edu(_) => {
// Appservices don't need EDUs (?)
}
}
}
let permit =
services().sending.maximum_requests.acquire().await;
let response = match appservice_server::send_request(
services()
.appservice
.get_registration(id)
.await
.ok_or_else(|| {
(
kind.clone(),
Error::bad_database(
"[Appservice] Could not load registration \
from db.",
),
)
})?,
appservice::event::push_events::v1::Request {
events: pdu_jsons,
txn_id: (&*general_purpose::URL_SAFE_NO_PAD.encode(
calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEventType::Edu(b)
| SendingEventType::Pdu(b) => &**b,
})
.collect::<Vec<_>>(),
),
))
.into(),
},
)
.await
{
Ok(_) => Ok(kind.clone()),
Err(e) => Err((kind.clone(), e)),
};
drop(permit);
response
} }
OutgoingKind::Push(userid, pushkey) => { OutgoingKind::Push(userid, pushkey) => {
let mut pdus = Vec::new(); Self::handle_push_event(userid, pushkey, events).await
for event in &events {
match event {
SendingEventType::Pdu(pdu_id) => {
pdus.push(
services()
.rooms
.timeline
.get_pdu_from_id(pdu_id)
.map_err(|e| (kind.clone(), e))?
.ok_or_else(|| {
(
kind.clone(),
Error::bad_database(
"[Push] Event in \
servernamevent_datas not \
found in db.",
),
)
})?,
);
}
// Push gateways don't need EDUs (?)
SendingEventType::Edu(_) => {}
}
}
for pdu in pdus {
// Redacted events are not notification targets (we don't
// send push for them)
if let Some(unsigned) = &pdu.unsigned {
if let Ok(unsigned) =
serde_json::from_str::<serde_json::Value>(
unsigned.get(),
)
{
if unsigned.get("redacted_because").is_some() {
continue;
}
}
}
let Some(pusher) = services()
.pusher
.get_pusher(userid, pushkey)
.map_err(|e| {
(
OutgoingKind::Push(
userid.clone(),
pushkey.clone(),
),
e,
)
})?
else {
continue;
};
let rules_for_user = services()
.account_data
.get(
None,
userid,
GlobalAccountDataEventType::PushRules
.to_string()
.into(),
)
.unwrap_or_default()
.and_then(|event| {
serde_json::from_str::<PushRulesEvent>(event.get())
.ok()
})
.map_or_else(
|| push::Ruleset::server_default(userid),
|ev: PushRulesEvent| ev.content.global,
);
let unread: UInt = services()
.rooms
.user
.notification_count(userid, &pdu.room_id)
.map_err(|e| (kind.clone(), e))?
.try_into()
.expect("notification count can't go that high");
let permit =
services().sending.maximum_requests.acquire().await;
let _response = services()
.pusher
.send_push_notice(
userid,
unread,
&pusher,
rules_for_user,
&pdu,
)
.await
.map(|_response| kind.clone())
.map_err(|e| (kind.clone(), e));
drop(permit);
}
Ok(OutgoingKind::Push(userid.clone(), pushkey.clone()))
} }
OutgoingKind::Normal(server) => { OutgoingKind::Normal(server) => {
let mut edu_jsons = Vec::new(); Self::handle_federation_event(server, events).await
let mut pdu_jsons = Vec::new();
for event in &events {
match event {
SendingEventType::Pdu(pdu_id) => {
// TODO: check room version and remove event_id if
// needed
let raw =
PduEvent::convert_to_outgoing_federation_event(
services()
.rooms
.timeline
.get_pdu_json_from_id(pdu_id)
.map_err(|e| {
(
OutgoingKind::Normal(
server.clone(),
),
e,
)
})?
.ok_or_else(|| {
error!(
"event not found: {server} \
{pdu_id:?}"
);
(
OutgoingKind::Normal(
server.clone(),
),
Error::bad_database(
"[Normal] Event in \
servernamevent_datas not \
found in db.",
),
)
})?,
);
pdu_jsons.push(raw);
}
SendingEventType::Edu(edu) => {
if let Ok(raw) = serde_json::from_slice(edu) {
edu_jsons.push(raw);
}
}
}
}
let permit =
services().sending.maximum_requests.acquire().await;
let response = server_server::send_request(
server,
send_transaction_message::v1::Request {
origin: services().globals.server_name().to_owned(),
pdus: pdu_jsons,
edus: edu_jsons,
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
transaction_id: (&*general_purpose::URL_SAFE_NO_PAD
.encode(calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEventType::Edu(b)
| SendingEventType::Pdu(b) => &**b,
})
.collect::<Vec<_>>(),
)))
.into(),
},
)
.await
.map(|response| {
for pdu in response.pdus {
if pdu.1.is_err() {
warn!("Failed to send to {}: {:?}", server, pdu);
}
}
kind.clone()
})
.map_err(|e| (kind, e));
drop(permit);
response
} }
};
match ret {
Ok(()) => Ok(kind),
Err(e) => Err((kind, e)),
} }
} }