sending.rs: move handler functions out of service

These don't take `self`, no reason for them to be associated functions.
This commit is contained in:
Lambda 2024-05-22 20:59:36 +00:00
parent 13f79dfee1
commit 9071e11e06

View file

@ -191,7 +191,7 @@ impl Service {
for (outgoing_kind, events) in initial_transactions {
current_transaction_status
.insert(outgoing_kind.clone(), TransactionStatus::Running);
futures.push(Self::handle_events(outgoing_kind.clone(), events));
futures.push(handle_events(outgoing_kind.clone(), events));
}
loop {
@ -204,7 +204,7 @@ impl Service {
response,
&mut current_transaction_status,
)? {
futures.push(Self::handle_events(kind, events));
futures.push(handle_events(kind, events));
}
}
Some(data) = receiver.recv() => {
@ -214,7 +214,7 @@ impl Service {
}) = self
.handle_receiver(data, &mut current_transaction_status)
{
futures.push(Self::handle_events(kind, events));
futures.push(handle_events(kind, events));
}
}
}
@ -626,248 +626,6 @@ impl Service {
Ok(())
}
#[tracing::instrument(skip(events))]
async fn handle_appservice_event(
id: &str,
events: Vec<SendingEventType>,
) -> Result<()> {
let mut pdu_jsons = Vec::new();
for event in &events {
match event {
SendingEventType::Pdu(pdu_id) => {
pdu_jsons.push(
services()
.rooms
.timeline
.get_pdu_from_id(pdu_id)?
.ok_or_else(|| {
Error::bad_database(
"[Appservice] Event in \
servernameevent_data not found in db.",
)
})?
.to_room_event(),
);
}
SendingEventType::Edu(_) => {
// Appservices don't need EDUs (?)
}
}
}
let permit = services().sending.maximum_requests.acquire().await;
appservice_server::send_request(
services().appservice.get_registration(id).await.ok_or_else(
|| {
Error::bad_database(
"[Appservice] Could not load registration from db.",
)
},
)?,
appservice::event::push_events::v1::Request {
events: pdu_jsons,
txn_id: general_purpose::URL_SAFE_NO_PAD
.encode(calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEventType::Edu(b)
| SendingEventType::Pdu(b) => &**b,
})
.collect::<Vec<_>>(),
))
.into(),
},
)
.await?;
drop(permit);
Ok(())
}
#[tracing::instrument(skip(events))]
async fn handle_push_event(
userid: &UserId,
pushkey: &str,
events: Vec<SendingEventType>,
) -> Result<()> {
let mut pdus = Vec::new();
for event in &events {
match event {
SendingEventType::Pdu(pdu_id) => {
pdus.push(
services()
.rooms
.timeline
.get_pdu_from_id(pdu_id)?
.ok_or_else(|| {
Error::bad_database(
"[Push] Event in servernamevent_datas not \
found in db.",
)
})?,
);
}
// Push gateways don't need EDUs (?)
SendingEventType::Edu(_) => {}
}
}
for pdu in pdus {
// Redacted events are not notification targets (we don't
// send push for them)
if let Some(unsigned) = &pdu.unsigned {
if let Ok(unsigned) =
serde_json::from_str::<serde_json::Value>(unsigned.get())
{
if unsigned.get("redacted_because").is_some() {
continue;
}
}
}
let Some(pusher) = services().pusher.get_pusher(userid, pushkey)?
else {
continue;
};
let rules_for_user = services()
.account_data
.get(
None,
userid,
GlobalAccountDataEventType::PushRules.to_string().into(),
)
.unwrap_or_default()
.and_then(|event| {
serde_json::from_str::<PushRulesEvent>(event.get()).ok()
})
.map_or_else(
|| push::Ruleset::server_default(userid),
|ev: PushRulesEvent| ev.content.global,
);
let unread: UInt = services()
.rooms
.user
.notification_count(userid, &pdu.room_id)?
.try_into()
.expect("notification count can't go that high");
let permit = services().sending.maximum_requests.acquire().await;
services()
.pusher
.send_push_notice(userid, unread, &pusher, rules_for_user, &pdu)
.await?;
drop(permit);
}
Ok(())
}
#[tracing::instrument(skip(events))]
async fn handle_federation_event(
server: &ServerName,
events: Vec<SendingEventType>,
) -> Result<()> {
let mut edu_jsons = Vec::new();
let mut pdu_jsons = Vec::new();
for event in &events {
match event {
SendingEventType::Pdu(pdu_id) => {
// TODO: check room version and remove event_id if
// needed
pdu_jsons.push(
PduEvent::convert_to_outgoing_federation_event(
services()
.rooms
.timeline
.get_pdu_json_from_id(pdu_id)?
.ok_or_else(|| {
error!(
"event not found: {server} {pdu_id:?}"
);
Error::bad_database(
"[Normal] Event in \
servernamevent_datas not found in db.",
)
})?,
),
);
}
SendingEventType::Edu(edu) => {
if let Ok(raw) = serde_json::from_slice(edu) {
edu_jsons.push(raw);
}
}
}
}
let permit = services().sending.maximum_requests.acquire().await;
let response = server_server::send_request(
server,
send_transaction_message::v1::Request {
origin: services().globals.server_name().to_owned(),
pdus: pdu_jsons,
edus: edu_jsons,
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
transaction_id: general_purpose::URL_SAFE_NO_PAD
.encode(calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEventType::Edu(b)
| SendingEventType::Pdu(b) => &**b,
})
.collect::<Vec<_>>(),
))
.into(),
},
)
.await?;
for pdu in response.pdus {
if pdu.1.is_err() {
warn!("Failed to send to {}: {:?}", server, pdu);
}
}
drop(permit);
Ok(())
}
#[tracing::instrument(skip_all)]
async fn handle_events(
kind: OutgoingKind,
events: Vec<SendingEventType>,
) -> HandlerResponse {
let ret = match &kind {
OutgoingKind::Appservice(id) => {
Self::handle_appservice_event(id, events).await
}
OutgoingKind::Push(userid, pushkey) => {
Self::handle_push_event(userid, pushkey, events).await
}
OutgoingKind::Normal(server) => {
Self::handle_federation_event(server, events).await
}
};
match ret {
Ok(()) => Ok(kind),
Err(e) => Err((kind, e)),
}
}
#[tracing::instrument(skip(self, request))]
pub(crate) async fn send_federation_request<T>(
&self,
@ -918,3 +676,239 @@ impl Service {
response
}
}
#[tracing::instrument(skip(events))]
async fn handle_appservice_event(
id: &str,
events: Vec<SendingEventType>,
) -> Result<()> {
let mut pdu_jsons = Vec::new();
for event in &events {
match event {
SendingEventType::Pdu(pdu_id) => {
pdu_jsons.push(
services()
.rooms
.timeline
.get_pdu_from_id(pdu_id)?
.ok_or_else(|| {
Error::bad_database(
"[Appservice] Event in servernameevent_data \
not found in db.",
)
})?
.to_room_event(),
);
}
SendingEventType::Edu(_) => {
// Appservices don't need EDUs (?)
}
}
}
let permit = services().sending.maximum_requests.acquire().await;
appservice_server::send_request(
services().appservice.get_registration(id).await.ok_or_else(|| {
Error::bad_database(
"[Appservice] Could not load registration from db.",
)
})?,
appservice::event::push_events::v1::Request {
events: pdu_jsons,
txn_id: general_purpose::URL_SAFE_NO_PAD
.encode(calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEventType::Edu(b)
| SendingEventType::Pdu(b) => &**b,
})
.collect::<Vec<_>>(),
))
.into(),
},
)
.await?;
drop(permit);
Ok(())
}
#[tracing::instrument(skip(events))]
async fn handle_push_event(
userid: &UserId,
pushkey: &str,
events: Vec<SendingEventType>,
) -> Result<()> {
let mut pdus = Vec::new();
for event in &events {
match event {
SendingEventType::Pdu(pdu_id) => {
pdus.push(
services()
.rooms
.timeline
.get_pdu_from_id(pdu_id)?
.ok_or_else(|| {
Error::bad_database(
"[Push] Event in servernamevent_datas not \
found in db.",
)
})?,
);
}
// Push gateways don't need EDUs (?)
SendingEventType::Edu(_) => {}
}
}
for pdu in pdus {
// Redacted events are not notification targets (we don't
// send push for them)
if let Some(unsigned) = &pdu.unsigned {
if let Ok(unsigned) =
serde_json::from_str::<serde_json::Value>(unsigned.get())
{
if unsigned.get("redacted_because").is_some() {
continue;
}
}
}
let Some(pusher) = services().pusher.get_pusher(userid, pushkey)?
else {
continue;
};
let rules_for_user = services()
.account_data
.get(
None,
userid,
GlobalAccountDataEventType::PushRules.to_string().into(),
)
.unwrap_or_default()
.and_then(|event| {
serde_json::from_str::<PushRulesEvent>(event.get()).ok()
})
.map_or_else(
|| push::Ruleset::server_default(userid),
|ev: PushRulesEvent| ev.content.global,
);
let unread: UInt = services()
.rooms
.user
.notification_count(userid, &pdu.room_id)?
.try_into()
.expect("notification count can't go that high");
let permit = services().sending.maximum_requests.acquire().await;
services()
.pusher
.send_push_notice(userid, unread, &pusher, rules_for_user, &pdu)
.await?;
drop(permit);
}
Ok(())
}
#[tracing::instrument(skip(events))]
async fn handle_federation_event(
server: &ServerName,
events: Vec<SendingEventType>,
) -> Result<()> {
let mut edu_jsons = Vec::new();
let mut pdu_jsons = Vec::new();
for event in &events {
match event {
SendingEventType::Pdu(pdu_id) => {
// TODO: check room version and remove event_id if
// needed
pdu_jsons.push(PduEvent::convert_to_outgoing_federation_event(
services()
.rooms
.timeline
.get_pdu_json_from_id(pdu_id)?
.ok_or_else(|| {
error!("event not found: {server} {pdu_id:?}");
Error::bad_database(
"[Normal] Event in servernamevent_datas not \
found in db.",
)
})?,
));
}
SendingEventType::Edu(edu) => {
if let Ok(raw) = serde_json::from_slice(edu) {
edu_jsons.push(raw);
}
}
}
}
let permit = services().sending.maximum_requests.acquire().await;
let response = server_server::send_request(
server,
send_transaction_message::v1::Request {
origin: services().globals.server_name().to_owned(),
pdus: pdu_jsons,
edus: edu_jsons,
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
transaction_id: general_purpose::URL_SAFE_NO_PAD
.encode(calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEventType::Edu(b)
| SendingEventType::Pdu(b) => &**b,
})
.collect::<Vec<_>>(),
))
.into(),
},
)
.await?;
for pdu in response.pdus {
if pdu.1.is_err() {
warn!("Failed to send to {}: {:?}", server, pdu);
}
}
drop(permit);
Ok(())
}
#[tracing::instrument(skip_all)]
async fn handle_events(
kind: OutgoingKind,
events: Vec<SendingEventType>,
) -> HandlerResponse {
let ret = match &kind {
OutgoingKind::Appservice(id) => {
handle_appservice_event(id, events).await
}
OutgoingKind::Push(userid, pushkey) => {
handle_push_event(userid, pushkey, events).await
}
OutgoingKind::Normal(server) => {
handle_federation_event(server, events).await
}
};
match ret {
Ok(()) => Ok(kind),
Err(e) => Err((kind, e)),
}
}