change rustfmt configuration

This change is fully automated, except the `rustfmt.toml` changes and
a few clippy directives to allow specific functions with too many lines
because they are longer now.
This commit is contained in:
Charles Hall 2024-05-16 01:19:04 -07:00
parent 40d6ce230d
commit 0afc1d2f50
No known key found for this signature in database
GPG key ID: 7B8E0645816E07CF
123 changed files with 7881 additions and 4687 deletions

View file

@ -1,7 +1,5 @@
mod data;
pub(crate) use data::Data;
use std::{
collections::{BTreeMap, HashMap, HashSet},
fmt::Debug,
@ -9,34 +7,29 @@ use std::{
time::{Duration, Instant},
};
use crate::{
api::{appservice_server, server_server},
services,
utils::calculate_hash,
Config, Error, PduEvent, Result,
};
use base64::{engine::general_purpose, Engine as _};
pub(crate) use data::Data;
use federation::transactions::send_transaction_message;
use futures_util::{stream::FuturesUnordered, StreamExt};
use base64::{engine::general_purpose, Engine as _};
use ruma::{
api::{
appservice::{self, Registration},
federation::{
self,
transactions::edu::{
DeviceListUpdateContent, Edu, ReceiptContent, ReceiptData, ReceiptMap,
DeviceListUpdateContent, Edu, ReceiptContent, ReceiptData,
ReceiptMap,
},
},
OutgoingRequest,
},
device_id,
events::{
push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent,
GlobalAccountDataEventType,
push_rules::PushRulesEvent, receipt::ReceiptType,
AnySyncEphemeralRoomEvent, GlobalAccountDataEventType,
},
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, ServerName, UInt, UserId,
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId,
ServerName, UInt, UserId,
};
use tokio::{
select,
@ -44,6 +37,13 @@ use tokio::{
};
use tracing::{debug, error, warn};
use crate::{
api::{appservice_server, server_server},
services,
utils::calculate_hash,
Config, Error, PduEvent, Result,
};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub(crate) enum OutgoingKind {
Appservice(String),
@ -64,7 +64,7 @@ impl OutgoingKind {
OutgoingKind::Push(user, pushkey) => {
let mut p = b"$".to_vec();
p.extend_from_slice(user.as_bytes());
p.push(0xff);
p.push(0xFF);
p.extend_from_slice(pushkey.as_bytes());
p
}
@ -74,7 +74,7 @@ impl OutgoingKind {
p
}
};
prefix.push(0xff);
prefix.push(0xFF);
prefix
}
@ -93,8 +93,11 @@ pub(crate) struct Service {
/// The state for a given state hash.
pub(super) maximum_requests: Arc<Semaphore>,
pub(crate) sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec<u8>)>,
receiver: Mutex<mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>>,
pub(crate) sender:
mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec<u8>)>,
receiver: Mutex<
mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>,
>,
}
enum TransactionStatus {
@ -112,7 +115,9 @@ impl Service {
db,
sender,
receiver: Mutex::new(receiver),
maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests.into())),
maximum_requests: Arc::new(Semaphore::new(
config.max_concurrent_requests.into(),
)),
})
}
@ -129,15 +134,18 @@ impl Service {
let mut futures = FuturesUnordered::new();
let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new();
let mut current_transaction_status =
HashMap::<OutgoingKind, TransactionStatus>::new();
// Retry requests we could not finish yet
let mut initial_transactions = HashMap::<OutgoingKind, Vec<SendingEventType>>::new();
let mut initial_transactions =
HashMap::<OutgoingKind, Vec<SendingEventType>>::new();
for (key, outgoing_kind, event) in self.db.active_requests().filter_map(Result::ok) {
let entry = initial_transactions
.entry(outgoing_kind.clone())
.or_default();
for (key, outgoing_kind, event) in
self.db.active_requests().filter_map(Result::ok)
{
let entry =
initial_transactions.entry(outgoing_kind.clone()).or_default();
if entry.len() > 30 {
warn!(
@ -152,77 +160,90 @@ impl Service {
}
for (outgoing_kind, events) in initial_transactions {
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
current_transaction_status
.insert(outgoing_kind.clone(), TransactionStatus::Running);
futures.push(Self::handle_events(outgoing_kind.clone(), events));
}
let handle_futures = |response,
current_transaction_status: &mut HashMap<_, _>,
futures: &mut FuturesUnordered<_>| {
match response {
Ok(outgoing_kind) => {
self.db.delete_all_active_requests_for(&outgoing_kind)?;
let handle_futures =
|response,
current_transaction_status: &mut HashMap<_, _>,
futures: &mut FuturesUnordered<_>| {
match response {
Ok(outgoing_kind) => {
self.db
.delete_all_active_requests_for(&outgoing_kind)?;
// Find events that have been added since starting the
// last request
let new_events = self
.db
.queued_requests(&outgoing_kind)
.filter_map(Result::ok)
.take(30)
.collect::<Vec<_>>();
// Find events that have been added since starting the
// last request
let new_events = self
.db
.queued_requests(&outgoing_kind)
.filter_map(Result::ok)
.take(30)
.collect::<Vec<_>>();
if new_events.is_empty() {
current_transaction_status.remove(&outgoing_kind);
} else {
// Insert pdus we found
self.db.mark_as_active(&new_events)?;
if new_events.is_empty() {
current_transaction_status.remove(&outgoing_kind);
} else {
// Insert pdus we found
self.db.mark_as_active(&new_events)?;
futures.push(Self::handle_events(
outgoing_kind.clone(),
new_events.into_iter().map(|(event, _)| event).collect(),
));
futures.push(Self::handle_events(
outgoing_kind.clone(),
new_events
.into_iter()
.map(|(event, _)| event)
.collect(),
));
}
}
}
Err((outgoing_kind, _)) => {
current_transaction_status
.entry(outgoing_kind)
.and_modify(|e| {
*e = match e {
TransactionStatus::Running => {
TransactionStatus::Failed(1, Instant::now())
}
TransactionStatus::Retrying(n) => {
TransactionStatus::Failed(*n + 1, Instant::now())
}
TransactionStatus::Failed(..) => {
error!(
"Request that was not even \
Err((outgoing_kind, _)) => {
current_transaction_status
.entry(outgoing_kind)
.and_modify(|e| {
*e = match e {
TransactionStatus::Running => {
TransactionStatus::Failed(
1,
Instant::now(),
)
}
TransactionStatus::Retrying(n) => {
TransactionStatus::Failed(
*n + 1,
Instant::now(),
)
}
TransactionStatus::Failed(..) => {
error!(
"Request that was not even \
running failed?!"
);
return;
);
return;
}
}
}
});
}
});
}
};
Result::<_>::Ok(())
};
Result::<_>::Ok(())
};
let handle_receiver = |outgoing_kind,
event,
key,
current_transaction_status: &mut HashMap<_, _>,
futures: &mut FuturesUnordered<_>| {
if let Ok(Some(events)) = self.select_events(
&outgoing_kind,
vec![(event, key)],
current_transaction_status,
) {
futures.push(Self::handle_events(outgoing_kind, events));
}
};
let handle_receiver =
|outgoing_kind,
event,
key,
current_transaction_status: &mut HashMap<_, _>,
futures: &mut FuturesUnordered<_>| {
if let Ok(Some(events)) = self.select_events(
&outgoing_kind,
vec![(event, key)],
current_transaction_status,
) {
futures.push(Self::handle_events(outgoing_kind, events));
}
};
loop {
select! {
@ -244,13 +265,21 @@ impl Service {
}
}
#[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))]
#[tracing::instrument(skip(
self,
outgoing_kind,
new_events,
current_transaction_status
))]
fn select_events(
&self,
outgoing_kind: &OutgoingKind,
// Events we want to send: event and full key
new_events: Vec<(SendingEventType, Vec<u8>)>,
current_transaction_status: &mut HashMap<OutgoingKind, TransactionStatus>,
current_transaction_status: &mut HashMap<
OutgoingKind,
TransactionStatus,
>,
) -> Result<Option<Vec<SendingEventType>>> {
let mut retry = false;
let mut allow = true;
@ -264,10 +293,14 @@ impl Service {
allow = false;
}
TransactionStatus::Failed(tries, time) => {
// Fail if a request has failed recently (exponential backoff)
let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
min_elapsed_duration = Duration::from_secs(60 * 60 * 24);
// Fail if a request has failed recently (exponential
// backoff)
let mut min_elapsed_duration =
Duration::from_secs(30) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24)
{
min_elapsed_duration =
Duration::from_secs(60 * 60 * 24);
}
if time.elapsed() < min_elapsed_duration {
@ -302,8 +335,12 @@ impl Service {
}
if let OutgoingKind::Normal(server_name) = outgoing_kind {
if let Ok((select_edus, last_count)) = self.select_edus(server_name) {
events.extend(select_edus.into_iter().map(SendingEventType::Edu));
if let Ok((select_edus, last_count)) =
self.select_edus(server_name)
{
events.extend(
select_edus.into_iter().map(SendingEventType::Edu),
);
self.db.set_latest_educount(server_name, last_count)?;
}
@ -314,14 +351,19 @@ impl Service {
}
#[tracing::instrument(skip(self, server_name))]
pub(crate) fn select_edus(&self, server_name: &ServerName) -> Result<(Vec<Vec<u8>>, u64)> {
pub(crate) fn select_edus(
&self,
server_name: &ServerName,
) -> Result<(Vec<Vec<u8>>, u64)> {
// u64: count of last edu
let since = self.db.get_latest_educount(server_name)?;
let mut events = Vec::new();
let mut max_edu_count = since;
let mut device_list_changes = HashSet::new();
'outer: for room_id in services().rooms.state_cache.server_rooms(server_name) {
'outer: for room_id in
services().rooms.state_cache.server_rooms(server_name)
{
let room_id = room_id?;
// Look for device list updates in this room
device_list_changes.extend(
@ -329,7 +371,10 @@ impl Service {
.users
.keys_changed(room_id.as_ref(), since, None)
.filter_map(Result::ok)
.filter(|user_id| user_id.server_name() == services().globals.server_name()),
.filter(|user_id| {
user_id.server_name()
== services().globals.server_name()
}),
);
// Look for read receipts in this room
@ -349,44 +394,57 @@ impl Service {
continue;
}
let event: AnySyncEphemeralRoomEvent =
serde_json::from_str(read_receipt.json().get())
.map_err(|_| Error::bad_database("Invalid edu event in read_receipts."))?;
let federation_event = if let AnySyncEphemeralRoomEvent::Receipt(r) = event {
let mut read = BTreeMap::new();
let event: AnySyncEphemeralRoomEvent = serde_json::from_str(
read_receipt.json().get(),
)
.map_err(|_| {
Error::bad_database("Invalid edu event in read_receipts.")
})?;
let federation_event =
if let AnySyncEphemeralRoomEvent::Receipt(r) = event {
let mut read = BTreeMap::new();
let (event_id, mut receipt) = r
.content
.0
.into_iter()
.next()
.expect("we only use one event per read receipt");
let receipt = receipt
.remove(&ReceiptType::Read)
.expect("our read receipts always set this")
.remove(&user_id)
.expect("our read receipts always have the user here");
let (event_id, mut receipt) =
r.content.0.into_iter().next().expect(
"we only use one event per read receipt",
);
let receipt = receipt
.remove(&ReceiptType::Read)
.expect("our read receipts always set this")
.remove(&user_id)
.expect(
"our read receipts always have the user here",
);
read.insert(
user_id,
ReceiptData {
data: receipt.clone(),
event_ids: vec![event_id.clone()],
},
);
read.insert(
user_id,
ReceiptData {
data: receipt.clone(),
event_ids: vec![event_id.clone()],
},
);
let receipt_map = ReceiptMap { read };
let receipt_map = ReceiptMap {
read,
};
let mut receipts = BTreeMap::new();
receipts.insert(room_id.clone(), receipt_map);
let mut receipts = BTreeMap::new();
receipts.insert(room_id.clone(), receipt_map);
Edu::Receipt(ReceiptContent { receipts })
} else {
Error::bad_database("Invalid event type in read_receipts");
continue;
};
Edu::Receipt(ReceiptContent {
receipts,
})
} else {
Error::bad_database(
"Invalid event type in read_receipts",
);
continue;
};
events.push(serde_json::to_vec(&federation_event).expect("json can be serialized"));
events.push(
serde_json::to_vec(&federation_event)
.expect("json can be serialized"),
);
if events.len() >= 20 {
break 'outer;
@ -407,7 +465,9 @@ impl Service {
keys: None,
});
events.push(serde_json::to_vec(&edu).expect("json can be serialized"));
events.push(
serde_json::to_vec(&edu).expect("json can be serialized"),
);
}
Ok((events, max_edu_count))
@ -422,7 +482,8 @@ impl Service {
) -> Result<()> {
let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey);
let event = SendingEventType::Pdu(pdu_id.to_owned());
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
let keys =
self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
self.sender
.send((outgoing_kind, event, keys.into_iter().next().unwrap()))
.unwrap();
@ -446,15 +507,10 @@ impl Service {
})
.collect::<Vec<_>>();
let keys = self.db.queue_requests(
&requests
.iter()
.map(|(o, e)| (o, e.clone()))
.collect::<Vec<_>>(),
&requests.iter().map(|(o, e)| (o, e.clone())).collect::<Vec<_>>(),
)?;
for ((outgoing_kind, event), key) in requests.into_iter().zip(keys) {
self.sender
.send((outgoing_kind.clone(), event, key))
.unwrap();
self.sender.send((outgoing_kind.clone(), event, key)).unwrap();
}
Ok(())
@ -469,7 +525,8 @@ impl Service {
) -> Result<()> {
let outgoing_kind = OutgoingKind::Normal(server.to_owned());
let event = SendingEventType::Edu(serialized);
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
let keys =
self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
self.sender
.send((outgoing_kind, event, keys.into_iter().next().unwrap()))
.unwrap();
@ -478,10 +535,15 @@ impl Service {
}
#[tracing::instrument(skip(self))]
pub(crate) fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> {
pub(crate) fn send_pdu_appservice(
&self,
appservice_id: String,
pdu_id: Vec<u8>,
) -> Result<()> {
let outgoing_kind = OutgoingKind::Appservice(appservice_id);
let event = SendingEventType::Pdu(pdu_id);
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
let keys =
self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
self.sender
.send((outgoing_kind, event, keys.into_iter().next().unwrap()))
.unwrap();
@ -493,8 +555,9 @@ impl Service {
/// Used for instance after we remove an appservice registration
#[tracing::instrument(skip(self))]
pub(crate) fn cleanup_events(&self, appservice_id: String) -> Result<()> {
self.db
.delete_all_requests_for(&OutgoingKind::Appservice(appservice_id))?;
self.db.delete_all_requests_for(&OutgoingKind::Appservice(
appservice_id,
))?;
Ok(())
}
@ -511,18 +574,24 @@ impl Service {
for event in &events {
match event {
SendingEventType::Pdu(pdu_id) => {
pdu_jsons.push(services().rooms.timeline
.get_pdu_from_id(pdu_id)
.map_err(|e| (kind.clone(), e))?
.ok_or_else(|| {
(
kind.clone(),
Error::bad_database(
"[Appservice] Event in servernameevent_data not found in db.",
),
)
})?
.to_room_event());
pdu_jsons.push(
services()
.rooms
.timeline
.get_pdu_from_id(pdu_id)
.map_err(|e| (kind.clone(), e))?
.ok_or_else(|| {
(
kind.clone(),
Error::bad_database(
"[Appservice] Event in \
servernameevent_data not \
found in db.",
),
)
})?
.to_room_event(),
);
}
SendingEventType::Edu(_) => {
// Appservices don't need EDUs (?)
@ -530,7 +599,8 @@ impl Service {
}
}
let permit = services().sending.maximum_requests.acquire().await;
let permit =
services().sending.maximum_requests.acquire().await;
let response = match appservice_server::send_request(
services()
@ -541,20 +611,24 @@ impl Service {
(
kind.clone(),
Error::bad_database(
"[Appservice] Could not load registration from db.",
"[Appservice] Could not load registration \
from db.",
),
)
})?,
appservice::event::push_events::v1::Request {
events: pdu_jsons,
txn_id: (&*general_purpose::URL_SAFE_NO_PAD.encode(calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b,
})
.collect::<Vec<_>>(),
)))
txn_id: (&*general_purpose::URL_SAFE_NO_PAD.encode(
calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEventType::Edu(b)
| SendingEventType::Pdu(b) => &**b,
})
.collect::<Vec<_>>(),
),
))
.into(),
},
)
@ -575,7 +649,8 @@ impl Service {
match event {
SendingEventType::Pdu(pdu_id) => {
pdus.push(
services().rooms
services()
.rooms
.timeline
.get_pdu_from_id(pdu_id)
.map_err(|e| (kind.clone(), e))?
@ -583,7 +658,9 @@ impl Service {
(
kind.clone(),
Error::bad_database(
"[Push] Event in servernamevent_datas not found in db.",
"[Push] Event in \
servernamevent_datas not \
found in db.",
),
)
})?,
@ -595,10 +672,13 @@ impl Service {
}
for pdu in pdus {
// Redacted events are not notification targets (we don't send push for them)
// Redacted events are not notification targets (we don't
// send push for them)
if let Some(unsigned) = &pdu.unsigned {
if let Ok(unsigned) =
serde_json::from_str::<serde_json::Value>(unsigned.get())
serde_json::from_str::<serde_json::Value>(
unsigned.get(),
)
{
if unsigned.get("redacted_because").is_some() {
continue;
@ -609,7 +689,15 @@ impl Service {
let Some(pusher) = services()
.pusher
.get_pusher(userid, pushkey)
.map_err(|e| (OutgoingKind::Push(userid.clone(), pushkey.clone()), e))?
.map_err(|e| {
(
OutgoingKind::Push(
userid.clone(),
pushkey.clone(),
),
e,
)
})?
else {
continue;
};
@ -619,10 +707,15 @@ impl Service {
.get(
None,
userid,
GlobalAccountDataEventType::PushRules.to_string().into(),
GlobalAccountDataEventType::PushRules
.to_string()
.into(),
)
.unwrap_or_default()
.and_then(|event| serde_json::from_str::<PushRulesEvent>(event.get()).ok())
.and_then(|event| {
serde_json::from_str::<PushRulesEvent>(event.get())
.ok()
})
.map_or_else(
|| push::Ruleset::server_default(userid),
|ev: PushRulesEvent| ev.content.global,
@ -636,11 +729,18 @@ impl Service {
.try_into()
.expect("notification count can't go that high");
let permit = services().sending.maximum_requests.acquire().await;
let permit =
services().sending.maximum_requests.acquire().await;
let _response = services()
.pusher
.send_push_notice(userid, unread, &pusher, rules_for_user, &pdu)
.send_push_notice(
userid,
unread,
&pusher,
rules_for_user,
&pdu,
)
.await
.map(|_response| kind.clone())
.map_err(|e| (kind.clone(), e));
@ -656,22 +756,39 @@ impl Service {
for event in &events {
match event {
SendingEventType::Pdu(pdu_id) => {
// TODO: check room version and remove event_id if needed
let raw = PduEvent::convert_to_outgoing_federation_event(
services().rooms
.timeline
.get_pdu_json_from_id(pdu_id)
.map_err(|e| (OutgoingKind::Normal(server.clone()), e))?
.ok_or_else(|| {
error!("event not found: {server} {pdu_id:?}");
(
OutgoingKind::Normal(server.clone()),
Error::bad_database(
"[Normal] Event in servernamevent_datas not found in db.",
),
)
})?,
);
// TODO: check room version and remove event_id if
// needed
let raw =
PduEvent::convert_to_outgoing_federation_event(
services()
.rooms
.timeline
.get_pdu_json_from_id(pdu_id)
.map_err(|e| {
(
OutgoingKind::Normal(
server.clone(),
),
e,
)
})?
.ok_or_else(|| {
error!(
"event not found: {server} \
{pdu_id:?}"
);
(
OutgoingKind::Normal(
server.clone(),
),
Error::bad_database(
"[Normal] Event in \
servernamevent_datas not \
found in db.",
),
)
})?,
);
pdu_jsons.push(raw);
}
SendingEventType::Edu(edu) => {
@ -682,7 +799,8 @@ impl Service {
}
}
let permit = services().sending.maximum_requests.acquire().await;
let permit =
services().sending.maximum_requests.acquire().await;
let response = server_server::send_request(
server,
@ -691,16 +809,16 @@ impl Service {
pdus: pdu_jsons,
edus: edu_jsons,
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
transaction_id: (&*general_purpose::URL_SAFE_NO_PAD.encode(
calculate_hash(
transaction_id: (&*general_purpose::URL_SAFE_NO_PAD
.encode(calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b,
SendingEventType::Edu(b)
| SendingEventType::Pdu(b) => &**b,
})
.collect::<Vec<_>>(),
),
))
)))
.into(),
},
)
@ -750,7 +868,8 @@ impl Service {
/// Sends a request to an appservice
///
/// Only returns None if there is no url specified in the appservice registration file
/// Only returns None if there is no url specified in the appservice
/// registration file
#[tracing::instrument(skip(self, registration, request))]
pub(crate) async fn send_appservice_request<T: OutgoingRequest>(
&self,
@ -761,7 +880,8 @@ impl Service {
T: Debug,
{
let permit = self.maximum_requests.acquire().await;
let response = appservice_server::send_request(registration, request).await;
let response =
appservice_server::send_request(registration, request).await;
drop(permit);
response