diff --git a/src/service/sending.rs b/src/service/sending.rs index 3dbfb714..6e1c03a7 100644 --- a/src/service/sending.rs +++ b/src/service/sending.rs @@ -191,7 +191,7 @@ impl Service { for (outgoing_kind, events) in initial_transactions { current_transaction_status .insert(outgoing_kind.clone(), TransactionStatus::Running); - futures.push(Self::handle_events(outgoing_kind.clone(), events)); + futures.push(handle_events(outgoing_kind.clone(), events)); } loop { @@ -204,7 +204,7 @@ impl Service { response, &mut current_transaction_status, )? { - futures.push(Self::handle_events(kind, events)); + futures.push(handle_events(kind, events)); } } Some(data) = receiver.recv() => { @@ -214,7 +214,7 @@ impl Service { }) = self .handle_receiver(data, &mut current_transaction_status) { - futures.push(Self::handle_events(kind, events)); + futures.push(handle_events(kind, events)); } } } @@ -626,248 +626,6 @@ impl Service { Ok(()) } - #[tracing::instrument(skip(events))] - async fn handle_appservice_event( - id: &str, - events: Vec, - ) -> Result<()> { - let mut pdu_jsons = Vec::new(); - - for event in &events { - match event { - SendingEventType::Pdu(pdu_id) => { - pdu_jsons.push( - services() - .rooms - .timeline - .get_pdu_from_id(pdu_id)? - .ok_or_else(|| { - Error::bad_database( - "[Appservice] Event in \ - servernameevent_data not found in db.", - ) - })? - .to_room_event(), - ); - } - SendingEventType::Edu(_) => { - // Appservices don't need EDUs (?) - } - } - } - - let permit = services().sending.maximum_requests.acquire().await; - - appservice_server::send_request( - services().appservice.get_registration(id).await.ok_or_else( - || { - Error::bad_database( - "[Appservice] Could not load registration from db.", - ) - }, - )?, - appservice::event::push_events::v1::Request { - events: pdu_jsons, - txn_id: general_purpose::URL_SAFE_NO_PAD - .encode(calculate_hash( - &events - .iter() - .map(|e| match e { - SendingEventType::Edu(b) - | SendingEventType::Pdu(b) => &**b, - }) - .collect::>(), - )) - .into(), - }, - ) - .await?; - - drop(permit); - - Ok(()) - } - - #[tracing::instrument(skip(events))] - async fn handle_push_event( - userid: &UserId, - pushkey: &str, - events: Vec, - ) -> Result<()> { - let mut pdus = Vec::new(); - - for event in &events { - match event { - SendingEventType::Pdu(pdu_id) => { - pdus.push( - services() - .rooms - .timeline - .get_pdu_from_id(pdu_id)? - .ok_or_else(|| { - Error::bad_database( - "[Push] Event in servernamevent_datas not \ - found in db.", - ) - })?, - ); - } - // Push gateways don't need EDUs (?) - SendingEventType::Edu(_) => {} - } - } - - for pdu in pdus { - // Redacted events are not notification targets (we don't - // send push for them) - if let Some(unsigned) = &pdu.unsigned { - if let Ok(unsigned) = - serde_json::from_str::(unsigned.get()) - { - if unsigned.get("redacted_because").is_some() { - continue; - } - } - } - - let Some(pusher) = services().pusher.get_pusher(userid, pushkey)? - else { - continue; - }; - - let rules_for_user = services() - .account_data - .get( - None, - userid, - GlobalAccountDataEventType::PushRules.to_string().into(), - ) - .unwrap_or_default() - .and_then(|event| { - serde_json::from_str::(event.get()).ok() - }) - .map_or_else( - || push::Ruleset::server_default(userid), - |ev: PushRulesEvent| ev.content.global, - ); - - let unread: UInt = services() - .rooms - .user - .notification_count(userid, &pdu.room_id)? - .try_into() - .expect("notification count can't go that high"); - - let permit = services().sending.maximum_requests.acquire().await; - - services() - .pusher - .send_push_notice(userid, unread, &pusher, rules_for_user, &pdu) - .await?; - - drop(permit); - } - - Ok(()) - } - - #[tracing::instrument(skip(events))] - async fn handle_federation_event( - server: &ServerName, - events: Vec, - ) -> Result<()> { - let mut edu_jsons = Vec::new(); - let mut pdu_jsons = Vec::new(); - - for event in &events { - match event { - SendingEventType::Pdu(pdu_id) => { - // TODO: check room version and remove event_id if - // needed - pdu_jsons.push( - PduEvent::convert_to_outgoing_federation_event( - services() - .rooms - .timeline - .get_pdu_json_from_id(pdu_id)? - .ok_or_else(|| { - error!( - "event not found: {server} {pdu_id:?}" - ); - Error::bad_database( - "[Normal] Event in \ - servernamevent_datas not found in db.", - ) - })?, - ), - ); - } - SendingEventType::Edu(edu) => { - if let Ok(raw) = serde_json::from_slice(edu) { - edu_jsons.push(raw); - } - } - } - } - - let permit = services().sending.maximum_requests.acquire().await; - - let response = server_server::send_request( - server, - send_transaction_message::v1::Request { - origin: services().globals.server_name().to_owned(), - pdus: pdu_jsons, - edus: edu_jsons, - origin_server_ts: MilliSecondsSinceUnixEpoch::now(), - transaction_id: general_purpose::URL_SAFE_NO_PAD - .encode(calculate_hash( - &events - .iter() - .map(|e| match e { - SendingEventType::Edu(b) - | SendingEventType::Pdu(b) => &**b, - }) - .collect::>(), - )) - .into(), - }, - ) - .await?; - - for pdu in response.pdus { - if pdu.1.is_err() { - warn!("Failed to send to {}: {:?}", server, pdu); - } - } - - drop(permit); - - Ok(()) - } - - #[tracing::instrument(skip_all)] - async fn handle_events( - kind: OutgoingKind, - events: Vec, - ) -> HandlerResponse { - let ret = match &kind { - OutgoingKind::Appservice(id) => { - Self::handle_appservice_event(id, events).await - } - OutgoingKind::Push(userid, pushkey) => { - Self::handle_push_event(userid, pushkey, events).await - } - OutgoingKind::Normal(server) => { - Self::handle_federation_event(server, events).await - } - }; - - match ret { - Ok(()) => Ok(kind), - Err(e) => Err((kind, e)), - } - } - #[tracing::instrument(skip(self, request))] pub(crate) async fn send_federation_request( &self, @@ -918,3 +676,239 @@ impl Service { response } } + +#[tracing::instrument(skip(events))] +async fn handle_appservice_event( + id: &str, + events: Vec, +) -> Result<()> { + let mut pdu_jsons = Vec::new(); + + for event in &events { + match event { + SendingEventType::Pdu(pdu_id) => { + pdu_jsons.push( + services() + .rooms + .timeline + .get_pdu_from_id(pdu_id)? + .ok_or_else(|| { + Error::bad_database( + "[Appservice] Event in servernameevent_data \ + not found in db.", + ) + })? + .to_room_event(), + ); + } + SendingEventType::Edu(_) => { + // Appservices don't need EDUs (?) + } + } + } + + let permit = services().sending.maximum_requests.acquire().await; + + appservice_server::send_request( + services().appservice.get_registration(id).await.ok_or_else(|| { + Error::bad_database( + "[Appservice] Could not load registration from db.", + ) + })?, + appservice::event::push_events::v1::Request { + events: pdu_jsons, + txn_id: general_purpose::URL_SAFE_NO_PAD + .encode(calculate_hash( + &events + .iter() + .map(|e| match e { + SendingEventType::Edu(b) + | SendingEventType::Pdu(b) => &**b, + }) + .collect::>(), + )) + .into(), + }, + ) + .await?; + + drop(permit); + + Ok(()) +} + +#[tracing::instrument(skip(events))] +async fn handle_push_event( + userid: &UserId, + pushkey: &str, + events: Vec, +) -> Result<()> { + let mut pdus = Vec::new(); + + for event in &events { + match event { + SendingEventType::Pdu(pdu_id) => { + pdus.push( + services() + .rooms + .timeline + .get_pdu_from_id(pdu_id)? + .ok_or_else(|| { + Error::bad_database( + "[Push] Event in servernamevent_datas not \ + found in db.", + ) + })?, + ); + } + // Push gateways don't need EDUs (?) + SendingEventType::Edu(_) => {} + } + } + + for pdu in pdus { + // Redacted events are not notification targets (we don't + // send push for them) + if let Some(unsigned) = &pdu.unsigned { + if let Ok(unsigned) = + serde_json::from_str::(unsigned.get()) + { + if unsigned.get("redacted_because").is_some() { + continue; + } + } + } + + let Some(pusher) = services().pusher.get_pusher(userid, pushkey)? + else { + continue; + }; + + let rules_for_user = services() + .account_data + .get( + None, + userid, + GlobalAccountDataEventType::PushRules.to_string().into(), + ) + .unwrap_or_default() + .and_then(|event| { + serde_json::from_str::(event.get()).ok() + }) + .map_or_else( + || push::Ruleset::server_default(userid), + |ev: PushRulesEvent| ev.content.global, + ); + + let unread: UInt = services() + .rooms + .user + .notification_count(userid, &pdu.room_id)? + .try_into() + .expect("notification count can't go that high"); + + let permit = services().sending.maximum_requests.acquire().await; + + services() + .pusher + .send_push_notice(userid, unread, &pusher, rules_for_user, &pdu) + .await?; + + drop(permit); + } + + Ok(()) +} + +#[tracing::instrument(skip(events))] +async fn handle_federation_event( + server: &ServerName, + events: Vec, +) -> Result<()> { + let mut edu_jsons = Vec::new(); + let mut pdu_jsons = Vec::new(); + + for event in &events { + match event { + SendingEventType::Pdu(pdu_id) => { + // TODO: check room version and remove event_id if + // needed + pdu_jsons.push(PduEvent::convert_to_outgoing_federation_event( + services() + .rooms + .timeline + .get_pdu_json_from_id(pdu_id)? + .ok_or_else(|| { + error!("event not found: {server} {pdu_id:?}"); + Error::bad_database( + "[Normal] Event in servernamevent_datas not \ + found in db.", + ) + })?, + )); + } + SendingEventType::Edu(edu) => { + if let Ok(raw) = serde_json::from_slice(edu) { + edu_jsons.push(raw); + } + } + } + } + + let permit = services().sending.maximum_requests.acquire().await; + + let response = server_server::send_request( + server, + send_transaction_message::v1::Request { + origin: services().globals.server_name().to_owned(), + pdus: pdu_jsons, + edus: edu_jsons, + origin_server_ts: MilliSecondsSinceUnixEpoch::now(), + transaction_id: general_purpose::URL_SAFE_NO_PAD + .encode(calculate_hash( + &events + .iter() + .map(|e| match e { + SendingEventType::Edu(b) + | SendingEventType::Pdu(b) => &**b, + }) + .collect::>(), + )) + .into(), + }, + ) + .await?; + + for pdu in response.pdus { + if pdu.1.is_err() { + warn!("Failed to send to {}: {:?}", server, pdu); + } + } + + drop(permit); + + Ok(()) +} + +#[tracing::instrument(skip_all)] +async fn handle_events( + kind: OutgoingKind, + events: Vec, +) -> HandlerResponse { + let ret = match &kind { + OutgoingKind::Appservice(id) => { + handle_appservice_event(id, events).await + } + OutgoingKind::Push(userid, pushkey) => { + handle_push_event(userid, pushkey, events).await + } + OutgoingKind::Normal(server) => { + handle_federation_event(server, events).await + } + }; + + match ret { + Ok(()) => Ok(kind), + Err(e) => Err((kind, e)), + } +}