diff --git a/src/service/sending.rs b/src/service/sending.rs index 021bec20..3dbfb714 100644 --- a/src/service/sending.rs +++ b/src/service/sending.rs @@ -627,280 +627,244 @@ impl Service { } #[tracing::instrument(skip(events))] + async fn handle_appservice_event( + id: &str, + events: Vec, + ) -> Result<()> { + let mut pdu_jsons = Vec::new(); + + for event in &events { + match event { + SendingEventType::Pdu(pdu_id) => { + pdu_jsons.push( + services() + .rooms + .timeline + .get_pdu_from_id(pdu_id)? + .ok_or_else(|| { + Error::bad_database( + "[Appservice] Event in \ + servernameevent_data not found in db.", + ) + })? + .to_room_event(), + ); + } + SendingEventType::Edu(_) => { + // Appservices don't need EDUs (?) + } + } + } + + let permit = services().sending.maximum_requests.acquire().await; + + appservice_server::send_request( + services().appservice.get_registration(id).await.ok_or_else( + || { + Error::bad_database( + "[Appservice] Could not load registration from db.", + ) + }, + )?, + appservice::event::push_events::v1::Request { + events: pdu_jsons, + txn_id: general_purpose::URL_SAFE_NO_PAD + .encode(calculate_hash( + &events + .iter() + .map(|e| match e { + SendingEventType::Edu(b) + | SendingEventType::Pdu(b) => &**b, + }) + .collect::>(), + )) + .into(), + }, + ) + .await?; + + drop(permit); + + Ok(()) + } + + #[tracing::instrument(skip(events))] + async fn handle_push_event( + userid: &UserId, + pushkey: &str, + events: Vec, + ) -> Result<()> { + let mut pdus = Vec::new(); + + for event in &events { + match event { + SendingEventType::Pdu(pdu_id) => { + pdus.push( + services() + .rooms + .timeline + .get_pdu_from_id(pdu_id)? + .ok_or_else(|| { + Error::bad_database( + "[Push] Event in servernamevent_datas not \ + found in db.", + ) + })?, + ); + } + // Push gateways don't need EDUs (?) + SendingEventType::Edu(_) => {} + } + } + + for pdu in pdus { + // Redacted events are not notification targets (we don't + // send push for them) + if let Some(unsigned) = &pdu.unsigned { + if let Ok(unsigned) = + serde_json::from_str::(unsigned.get()) + { + if unsigned.get("redacted_because").is_some() { + continue; + } + } + } + + let Some(pusher) = services().pusher.get_pusher(userid, pushkey)? + else { + continue; + }; + + let rules_for_user = services() + .account_data + .get( + None, + userid, + GlobalAccountDataEventType::PushRules.to_string().into(), + ) + .unwrap_or_default() + .and_then(|event| { + serde_json::from_str::(event.get()).ok() + }) + .map_or_else( + || push::Ruleset::server_default(userid), + |ev: PushRulesEvent| ev.content.global, + ); + + let unread: UInt = services() + .rooms + .user + .notification_count(userid, &pdu.room_id)? + .try_into() + .expect("notification count can't go that high"); + + let permit = services().sending.maximum_requests.acquire().await; + + services() + .pusher + .send_push_notice(userid, unread, &pusher, rules_for_user, &pdu) + .await?; + + drop(permit); + } + + Ok(()) + } + + #[tracing::instrument(skip(events))] + async fn handle_federation_event( + server: &ServerName, + events: Vec, + ) -> Result<()> { + let mut edu_jsons = Vec::new(); + let mut pdu_jsons = Vec::new(); + + for event in &events { + match event { + SendingEventType::Pdu(pdu_id) => { + // TODO: check room version and remove event_id if + // needed + pdu_jsons.push( + PduEvent::convert_to_outgoing_federation_event( + services() + .rooms + .timeline + .get_pdu_json_from_id(pdu_id)? + .ok_or_else(|| { + error!( + "event not found: {server} {pdu_id:?}" + ); + Error::bad_database( + "[Normal] Event in \ + servernamevent_datas not found in db.", + ) + })?, + ), + ); + } + SendingEventType::Edu(edu) => { + if let Ok(raw) = serde_json::from_slice(edu) { + edu_jsons.push(raw); + } + } + } + } + + let permit = services().sending.maximum_requests.acquire().await; + + let response = server_server::send_request( + server, + send_transaction_message::v1::Request { + origin: services().globals.server_name().to_owned(), + pdus: pdu_jsons, + edus: edu_jsons, + origin_server_ts: MilliSecondsSinceUnixEpoch::now(), + transaction_id: general_purpose::URL_SAFE_NO_PAD + .encode(calculate_hash( + &events + .iter() + .map(|e| match e { + SendingEventType::Edu(b) + | SendingEventType::Pdu(b) => &**b, + }) + .collect::>(), + )) + .into(), + }, + ) + .await?; + + for pdu in response.pdus { + if pdu.1.is_err() { + warn!("Failed to send to {}: {:?}", server, pdu); + } + } + + drop(permit); + + Ok(()) + } + + #[tracing::instrument(skip_all)] async fn handle_events( kind: OutgoingKind, events: Vec, ) -> HandlerResponse { - match &kind { + let ret = match &kind { OutgoingKind::Appservice(id) => { - let mut pdu_jsons = Vec::new(); - - for event in &events { - match event { - SendingEventType::Pdu(pdu_id) => { - pdu_jsons.push( - services() - .rooms - .timeline - .get_pdu_from_id(pdu_id) - .map_err(|e| (kind.clone(), e))? - .ok_or_else(|| { - ( - kind.clone(), - Error::bad_database( - "[Appservice] Event in \ - servernameevent_data not \ - found in db.", - ), - ) - })? - .to_room_event(), - ); - } - SendingEventType::Edu(_) => { - // Appservices don't need EDUs (?) - } - } - } - - let permit = - services().sending.maximum_requests.acquire().await; - - let response = match appservice_server::send_request( - services() - .appservice - .get_registration(id) - .await - .ok_or_else(|| { - ( - kind.clone(), - Error::bad_database( - "[Appservice] Could not load registration \ - from db.", - ), - ) - })?, - appservice::event::push_events::v1::Request { - events: pdu_jsons, - txn_id: (&*general_purpose::URL_SAFE_NO_PAD.encode( - calculate_hash( - &events - .iter() - .map(|e| match e { - SendingEventType::Edu(b) - | SendingEventType::Pdu(b) => &**b, - }) - .collect::>(), - ), - )) - .into(), - }, - ) - .await - { - Ok(_) => Ok(kind.clone()), - Err(e) => Err((kind.clone(), e)), - }; - - drop(permit); - - response + Self::handle_appservice_event(id, events).await } OutgoingKind::Push(userid, pushkey) => { - let mut pdus = Vec::new(); - - for event in &events { - match event { - SendingEventType::Pdu(pdu_id) => { - pdus.push( - services() - .rooms - .timeline - .get_pdu_from_id(pdu_id) - .map_err(|e| (kind.clone(), e))? - .ok_or_else(|| { - ( - kind.clone(), - Error::bad_database( - "[Push] Event in \ - servernamevent_datas not \ - found in db.", - ), - ) - })?, - ); - } - // Push gateways don't need EDUs (?) - SendingEventType::Edu(_) => {} - } - } - - for pdu in pdus { - // Redacted events are not notification targets (we don't - // send push for them) - if let Some(unsigned) = &pdu.unsigned { - if let Ok(unsigned) = - serde_json::from_str::( - unsigned.get(), - ) - { - if unsigned.get("redacted_because").is_some() { - continue; - } - } - } - - let Some(pusher) = services() - .pusher - .get_pusher(userid, pushkey) - .map_err(|e| { - ( - OutgoingKind::Push( - userid.clone(), - pushkey.clone(), - ), - e, - ) - })? - else { - continue; - }; - - let rules_for_user = services() - .account_data - .get( - None, - userid, - GlobalAccountDataEventType::PushRules - .to_string() - .into(), - ) - .unwrap_or_default() - .and_then(|event| { - serde_json::from_str::(event.get()) - .ok() - }) - .map_or_else( - || push::Ruleset::server_default(userid), - |ev: PushRulesEvent| ev.content.global, - ); - - let unread: UInt = services() - .rooms - .user - .notification_count(userid, &pdu.room_id) - .map_err(|e| (kind.clone(), e))? - .try_into() - .expect("notification count can't go that high"); - - let permit = - services().sending.maximum_requests.acquire().await; - - let _response = services() - .pusher - .send_push_notice( - userid, - unread, - &pusher, - rules_for_user, - &pdu, - ) - .await - .map(|_response| kind.clone()) - .map_err(|e| (kind.clone(), e)); - - drop(permit); - } - Ok(OutgoingKind::Push(userid.clone(), pushkey.clone())) + Self::handle_push_event(userid, pushkey, events).await } OutgoingKind::Normal(server) => { - let mut edu_jsons = Vec::new(); - let mut pdu_jsons = Vec::new(); - - for event in &events { - match event { - SendingEventType::Pdu(pdu_id) => { - // TODO: check room version and remove event_id if - // needed - let raw = - PduEvent::convert_to_outgoing_federation_event( - services() - .rooms - .timeline - .get_pdu_json_from_id(pdu_id) - .map_err(|e| { - ( - OutgoingKind::Normal( - server.clone(), - ), - e, - ) - })? - .ok_or_else(|| { - error!( - "event not found: {server} \ - {pdu_id:?}" - ); - ( - OutgoingKind::Normal( - server.clone(), - ), - Error::bad_database( - "[Normal] Event in \ - servernamevent_datas not \ - found in db.", - ), - ) - })?, - ); - pdu_jsons.push(raw); - } - SendingEventType::Edu(edu) => { - if let Ok(raw) = serde_json::from_slice(edu) { - edu_jsons.push(raw); - } - } - } - } - - let permit = - services().sending.maximum_requests.acquire().await; - - let response = server_server::send_request( - server, - send_transaction_message::v1::Request { - origin: services().globals.server_name().to_owned(), - pdus: pdu_jsons, - edus: edu_jsons, - origin_server_ts: MilliSecondsSinceUnixEpoch::now(), - transaction_id: (&*general_purpose::URL_SAFE_NO_PAD - .encode(calculate_hash( - &events - .iter() - .map(|e| match e { - SendingEventType::Edu(b) - | SendingEventType::Pdu(b) => &**b, - }) - .collect::>(), - ))) - .into(), - }, - ) - .await - .map(|response| { - for pdu in response.pdus { - if pdu.1.is_err() { - warn!("Failed to send to {}: {:?}", server, pdu); - } - } - kind.clone() - }) - .map_err(|e| (kind, e)); - - drop(permit); - - response + Self::handle_federation_event(server, events).await } + }; + + match ret { + Ok(()) => Ok(kind), + Err(e) => Err((kind, e)), } }