mirror of
https://gitlab.computer.surgery/matrix/grapevine.git
synced 2025-12-17 15:51:23 +01:00
enable single_match_else lint
Also collapses some if statements in the federation name resolution code
This commit is contained in:
parent
a62fa7b6ee
commit
75358340bb
6 changed files with 211 additions and 236 deletions
|
|
@ -402,39 +402,36 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>(
|
|||
.collect();
|
||||
|
||||
while let Some((server, response)) = futures.next().await {
|
||||
match response {
|
||||
Ok(Ok(response)) => {
|
||||
for (user, masterkey) in response.master_keys {
|
||||
let (master_key_id, mut master_key) =
|
||||
services().users.parse_master_key(&user, &masterkey)?;
|
||||
if let Ok(Ok(response)) = response {
|
||||
for (user, masterkey) in response.master_keys {
|
||||
let (master_key_id, mut master_key) =
|
||||
services().users.parse_master_key(&user, &masterkey)?;
|
||||
|
||||
if let Some(our_master_key) = services().users.get_key(
|
||||
&master_key_id,
|
||||
sender_user,
|
||||
&user,
|
||||
&allowed_signatures,
|
||||
)? {
|
||||
let (_, our_master_key) =
|
||||
services().users.parse_master_key(&user, &our_master_key)?;
|
||||
master_key.signatures.extend(our_master_key.signatures);
|
||||
}
|
||||
let json = serde_json::to_value(master_key).expect("to_value always works");
|
||||
let raw = serde_json::from_value(json).expect("Raw::from_value always works");
|
||||
services().users.add_cross_signing_keys(
|
||||
&user, &raw, &None, &None,
|
||||
false, // Dont notify. A notification would trigger another key request resulting in an endless loop
|
||||
)?;
|
||||
master_keys.insert(user, raw);
|
||||
if let Some(our_master_key) = services().users.get_key(
|
||||
&master_key_id,
|
||||
sender_user,
|
||||
&user,
|
||||
&allowed_signatures,
|
||||
)? {
|
||||
let (_, our_master_key) =
|
||||
services().users.parse_master_key(&user, &our_master_key)?;
|
||||
master_key.signatures.extend(our_master_key.signatures);
|
||||
}
|
||||
|
||||
self_signing_keys.extend(response.self_signing_keys);
|
||||
device_keys.extend(response.device_keys);
|
||||
let json = serde_json::to_value(master_key).expect("to_value always works");
|
||||
let raw = serde_json::from_value(json).expect("Raw::from_value always works");
|
||||
services().users.add_cross_signing_keys(
|
||||
&user, &raw, &None, &None,
|
||||
false, // Dont notify. A notification would trigger another key request resulting in an endless loop
|
||||
)?;
|
||||
master_keys.insert(user, raw);
|
||||
}
|
||||
_ => {
|
||||
back_off(server.to_owned()).await;
|
||||
|
||||
failures.insert(server.to_string(), json!({}));
|
||||
}
|
||||
self_signing_keys.extend(response.self_signing_keys);
|
||||
device_keys.extend(response.device_keys);
|
||||
} else {
|
||||
back_off(server.to_owned()).await;
|
||||
|
||||
failures.insert(server.to_string(), json!({}));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -359,100 +359,84 @@ async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDe
|
|||
FedDest::Named(host.to_owned(), port.to_owned())
|
||||
} else {
|
||||
debug!("Requesting well known for {destination}");
|
||||
match request_well_known(destination.as_str()).await {
|
||||
Some(delegated_hostname) => {
|
||||
debug!("3: A .well-known file is available");
|
||||
hostname = add_port_to_hostname(&delegated_hostname).into_uri_string();
|
||||
match get_ip_with_port(&delegated_hostname) {
|
||||
Some(host_and_port) => host_and_port, // 3.1: IP literal in .well-known file
|
||||
None => {
|
||||
if let Some(pos) = delegated_hostname.find(':') {
|
||||
debug!("3.2: Hostname with port in .well-known file");
|
||||
let (host, port) = delegated_hostname.split_at(pos);
|
||||
FedDest::Named(host.to_owned(), port.to_owned())
|
||||
} else {
|
||||
debug!("Delegated hostname has no port in this branch");
|
||||
if let Some(hostname_override) =
|
||||
query_srv_record(&delegated_hostname).await
|
||||
{
|
||||
debug!("3.3: SRV lookup successful");
|
||||
let force_port = hostname_override.port();
|
||||
if let Some(delegated_hostname) = request_well_known(destination.as_str()).await {
|
||||
debug!("3: A .well-known file is available");
|
||||
hostname = add_port_to_hostname(&delegated_hostname).into_uri_string();
|
||||
if let Some(host_and_port) = get_ip_with_port(&delegated_hostname) {
|
||||
host_and_port
|
||||
} else if let Some(pos) = delegated_hostname.find(':') {
|
||||
debug!("3.2: Hostname with port in .well-known file");
|
||||
let (host, port) = delegated_hostname.split_at(pos);
|
||||
FedDest::Named(host.to_owned(), port.to_owned())
|
||||
} else {
|
||||
debug!("Delegated hostname has no port in this branch");
|
||||
if let Some(hostname_override) = query_srv_record(&delegated_hostname).await
|
||||
{
|
||||
debug!("3.3: SRV lookup successful");
|
||||
let force_port = hostname_override.port();
|
||||
|
||||
if let Ok(override_ip) = services()
|
||||
.globals
|
||||
.dns_resolver()
|
||||
.lookup_ip(hostname_override.hostname())
|
||||
.await
|
||||
{
|
||||
services()
|
||||
.globals
|
||||
.tls_name_override
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(
|
||||
delegated_hostname.clone(),
|
||||
(
|
||||
override_ip.iter().collect(),
|
||||
force_port.unwrap_or(8448),
|
||||
),
|
||||
);
|
||||
} else {
|
||||
warn!("Using SRV record, but could not resolve to IP");
|
||||
}
|
||||
|
||||
if let Some(port) = force_port {
|
||||
FedDest::Named(delegated_hostname, format!(":{port}"))
|
||||
} else {
|
||||
add_port_to_hostname(&delegated_hostname)
|
||||
}
|
||||
} else {
|
||||
debug!("3.4: No SRV records, just use the hostname from .well-known");
|
||||
add_port_to_hostname(&delegated_hostname)
|
||||
}
|
||||
}
|
||||
if let Ok(override_ip) = services()
|
||||
.globals
|
||||
.dns_resolver()
|
||||
.lookup_ip(hostname_override.hostname())
|
||||
.await
|
||||
{
|
||||
services()
|
||||
.globals
|
||||
.tls_name_override
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(
|
||||
delegated_hostname.clone(),
|
||||
(override_ip.iter().collect(), force_port.unwrap_or(8448)),
|
||||
);
|
||||
} else {
|
||||
warn!("Using SRV record, but could not resolve to IP");
|
||||
}
|
||||
|
||||
if let Some(port) = force_port {
|
||||
FedDest::Named(delegated_hostname, format!(":{port}"))
|
||||
} else {
|
||||
add_port_to_hostname(&delegated_hostname)
|
||||
}
|
||||
} else {
|
||||
debug!("3.4: No SRV records, just use the hostname from .well-known");
|
||||
add_port_to_hostname(&delegated_hostname)
|
||||
}
|
||||
}
|
||||
None => {
|
||||
debug!("4: No .well-known or an error occured");
|
||||
match query_srv_record(&destination_str).await {
|
||||
Some(hostname_override) => {
|
||||
debug!("4: SRV record found");
|
||||
let force_port = hostname_override.port();
|
||||
} else {
|
||||
debug!("4: No .well-known or an error occured");
|
||||
if let Some(hostname_override) = query_srv_record(&destination_str).await {
|
||||
debug!("4: SRV record found");
|
||||
let force_port = hostname_override.port();
|
||||
|
||||
if let Ok(override_ip) = services()
|
||||
.globals
|
||||
.dns_resolver()
|
||||
.lookup_ip(hostname_override.hostname())
|
||||
.await
|
||||
{
|
||||
services()
|
||||
.globals
|
||||
.tls_name_override
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(
|
||||
hostname.clone(),
|
||||
(
|
||||
override_ip.iter().collect(),
|
||||
force_port.unwrap_or(8448),
|
||||
),
|
||||
);
|
||||
} else {
|
||||
warn!("Using SRV record, but could not resolve to IP");
|
||||
}
|
||||
|
||||
if let Some(port) = force_port {
|
||||
FedDest::Named(hostname.clone(), format!(":{port}"))
|
||||
} else {
|
||||
add_port_to_hostname(&hostname)
|
||||
}
|
||||
}
|
||||
None => {
|
||||
debug!("5: No SRV record found");
|
||||
add_port_to_hostname(&destination_str)
|
||||
}
|
||||
if let Ok(override_ip) = services()
|
||||
.globals
|
||||
.dns_resolver()
|
||||
.lookup_ip(hostname_override.hostname())
|
||||
.await
|
||||
{
|
||||
services()
|
||||
.globals
|
||||
.tls_name_override
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(
|
||||
hostname.clone(),
|
||||
(override_ip.iter().collect(), force_port.unwrap_or(8448)),
|
||||
);
|
||||
} else {
|
||||
warn!("Using SRV record, but could not resolve to IP");
|
||||
}
|
||||
|
||||
if let Some(port) = force_port {
|
||||
FedDest::Named(hostname.clone(), format!(":{port}"))
|
||||
} else {
|
||||
add_port_to_hostname(&hostname)
|
||||
}
|
||||
} else {
|
||||
debug!("5: No SRV record found");
|
||||
add_port_to_hostname(&destination_str)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1270,15 +1254,14 @@ pub(crate) async fn get_room_state_route(
|
|||
|
||||
Ok(get_room_state::v1::Response {
|
||||
auth_chain: auth_chain_ids
|
||||
.filter_map(
|
||||
|id| match services().rooms.timeline.get_pdu_json(&id).ok()? {
|
||||
Some(json) => Some(PduEvent::convert_to_outgoing_federation_event(json)),
|
||||
None => {
|
||||
error!("Could not find event json for {id} in db.");
|
||||
None
|
||||
}
|
||||
},
|
||||
)
|
||||
.filter_map(|id| {
|
||||
if let Some(json) = services().rooms.timeline.get_pdu_json(&id).ok()? {
|
||||
Some(PduEvent::convert_to_outgoing_federation_event(json))
|
||||
} else {
|
||||
error!("Could not find event json for {id} in db.");
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
pdus,
|
||||
})
|
||||
|
|
|
|||
|
|
@ -10,18 +10,18 @@ impl service::rooms::short::Data for KeyValueDatabase {
|
|||
return Ok(*short);
|
||||
}
|
||||
|
||||
let short = match self.eventid_shorteventid.get(event_id.as_bytes())? {
|
||||
Some(shorteventid) => utils::u64_from_bytes(&shorteventid)
|
||||
.map_err(|_| Error::bad_database("Invalid shorteventid in db."))?,
|
||||
None => {
|
||||
let short =
|
||||
if let Some(shorteventid) = self.eventid_shorteventid.get(event_id.as_bytes())? {
|
||||
utils::u64_from_bytes(&shorteventid)
|
||||
.map_err(|_| Error::bad_database("Invalid shorteventid in db."))?
|
||||
} else {
|
||||
let shorteventid = services().globals.next_count()?;
|
||||
self.eventid_shorteventid
|
||||
.insert(event_id.as_bytes(), &shorteventid.to_be_bytes())?;
|
||||
self.shorteventid_eventid
|
||||
.insert(&shorteventid.to_be_bytes(), event_id.as_bytes())?;
|
||||
shorteventid
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
self.eventidshort_cache
|
||||
.lock()
|
||||
|
|
@ -86,17 +86,16 @@ impl service::rooms::short::Data for KeyValueDatabase {
|
|||
db_key.push(0xff);
|
||||
db_key.extend_from_slice(state_key.as_bytes());
|
||||
|
||||
let short = match self.statekey_shortstatekey.get(&db_key)? {
|
||||
Some(shortstatekey) => utils::u64_from_bytes(&shortstatekey)
|
||||
.map_err(|_| Error::bad_database("Invalid shortstatekey in db."))?,
|
||||
None => {
|
||||
let shortstatekey = services().globals.next_count()?;
|
||||
self.statekey_shortstatekey
|
||||
.insert(&db_key, &shortstatekey.to_be_bytes())?;
|
||||
self.shortstatekey_statekey
|
||||
.insert(&shortstatekey.to_be_bytes(), &db_key)?;
|
||||
shortstatekey
|
||||
}
|
||||
let short = if let Some(shortstatekey) = self.statekey_shortstatekey.get(&db_key)? {
|
||||
utils::u64_from_bytes(&shortstatekey)
|
||||
.map_err(|_| Error::bad_database("Invalid shortstatekey in db."))?
|
||||
} else {
|
||||
let shortstatekey = services().globals.next_count()?;
|
||||
self.statekey_shortstatekey
|
||||
.insert(&db_key, &shortstatekey.to_be_bytes())?;
|
||||
self.shortstatekey_statekey
|
||||
.insert(&shortstatekey.to_be_bytes(), &db_key)?;
|
||||
shortstatekey
|
||||
};
|
||||
|
||||
self.statekeyshort_cache
|
||||
|
|
@ -177,19 +176,20 @@ impl service::rooms::short::Data for KeyValueDatabase {
|
|||
|
||||
/// Returns `(shortstatehash, already_existed)`
|
||||
fn get_or_create_shortstatehash(&self, state_hash: &[u8]) -> Result<(u64, bool)> {
|
||||
Ok(match self.statehash_shortstatehash.get(state_hash)? {
|
||||
Some(shortstatehash) => (
|
||||
utils::u64_from_bytes(&shortstatehash)
|
||||
.map_err(|_| Error::bad_database("Invalid shortstatehash in db."))?,
|
||||
true,
|
||||
),
|
||||
None => {
|
||||
Ok(
|
||||
if let Some(shortstatehash) = self.statehash_shortstatehash.get(state_hash)? {
|
||||
(
|
||||
utils::u64_from_bytes(&shortstatehash)
|
||||
.map_err(|_| Error::bad_database("Invalid shortstatehash in db."))?,
|
||||
true,
|
||||
)
|
||||
} else {
|
||||
let shortstatehash = services().globals.next_count()?;
|
||||
self.statehash_shortstatehash
|
||||
.insert(state_hash, &shortstatehash.to_be_bytes())?;
|
||||
(shortstatehash, false)
|
||||
}
|
||||
})
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn get_shortroomid(&self, room_id: &RoomId) -> Result<Option<u64>> {
|
||||
|
|
@ -203,15 +203,16 @@ impl service::rooms::short::Data for KeyValueDatabase {
|
|||
}
|
||||
|
||||
fn get_or_create_shortroomid(&self, room_id: &RoomId) -> Result<u64> {
|
||||
Ok(match self.roomid_shortroomid.get(room_id.as_bytes())? {
|
||||
Some(short) => utils::u64_from_bytes(&short)
|
||||
.map_err(|_| Error::bad_database("Invalid shortroomid in db."))?,
|
||||
None => {
|
||||
Ok(
|
||||
if let Some(short) = self.roomid_shortroomid.get(room_id.as_bytes())? {
|
||||
utils::u64_from_bytes(&short)
|
||||
.map_err(|_| Error::bad_database("Invalid shortroomid in db."))?
|
||||
} else {
|
||||
let short = services().globals.next_count()?;
|
||||
self.roomid_shortroomid
|
||||
.insert(room_id.as_bytes(), &short.to_be_bytes())?;
|
||||
short
|
||||
}
|
||||
})
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1153,7 +1153,7 @@ impl Service {
|
|||
}
|
||||
|
||||
info!("Fetching {} over federation.", next_id);
|
||||
match services()
|
||||
if let Ok(res) = services()
|
||||
.sending
|
||||
.send_federation_request(
|
||||
origin,
|
||||
|
|
@ -1163,44 +1163,41 @@ impl Service {
|
|||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => {
|
||||
info!("Got {} over federation", next_id);
|
||||
let Ok((calculated_event_id, value)) =
|
||||
pdu::gen_event_id_canonical_json(&res.pdu, room_version_id)
|
||||
else {
|
||||
back_off((*next_id).to_owned()).await;
|
||||
continue;
|
||||
};
|
||||
|
||||
if calculated_event_id != *next_id {
|
||||
warn!("Server didn't return event id we requested: requested: {}, we got {}. Event: {:?}",
|
||||
next_id, calculated_event_id, &res.pdu);
|
||||
}
|
||||
|
||||
if let Some(auth_events) =
|
||||
value.get("auth_events").and_then(|c| c.as_array())
|
||||
{
|
||||
for auth_event in auth_events {
|
||||
if let Ok(auth_event) =
|
||||
serde_json::from_value(auth_event.clone().into())
|
||||
{
|
||||
let a: Arc<EventId> = auth_event;
|
||||
todo_auth_events.push(a);
|
||||
} else {
|
||||
warn!("Auth event id is not valid");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("Auth event list invalid");
|
||||
}
|
||||
|
||||
events_in_reverse_order.push((next_id.clone(), value));
|
||||
events_all.insert(next_id);
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("Failed to fetch event: {}", next_id);
|
||||
info!("Got {} over federation", next_id);
|
||||
let Ok((calculated_event_id, value)) =
|
||||
pdu::gen_event_id_canonical_json(&res.pdu, room_version_id)
|
||||
else {
|
||||
back_off((*next_id).to_owned()).await;
|
||||
continue;
|
||||
};
|
||||
|
||||
if calculated_event_id != *next_id {
|
||||
warn!("Server didn't return event id we requested: requested: {}, we got {}. Event: {:?}",
|
||||
next_id, calculated_event_id, &res.pdu);
|
||||
}
|
||||
|
||||
if let Some(auth_events) =
|
||||
value.get("auth_events").and_then(|c| c.as_array())
|
||||
{
|
||||
for auth_event in auth_events {
|
||||
if let Ok(auth_event) =
|
||||
serde_json::from_value(auth_event.clone().into())
|
||||
{
|
||||
let a: Arc<EventId> = auth_event;
|
||||
todo_auth_events.push(a);
|
||||
} else {
|
||||
warn!("Auth event id is not valid");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("Auth event list invalid");
|
||||
}
|
||||
|
||||
events_in_reverse_order.push((next_id.clone(), value));
|
||||
events_all.insert(next_id);
|
||||
} else {
|
||||
warn!("Failed to fetch event: {}", next_id);
|
||||
back_off((*next_id).to_owned()).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1670,18 +1667,17 @@ impl Service {
|
|||
.get(origin)
|
||||
.map(|s| Arc::clone(s).acquire_owned());
|
||||
|
||||
let permit = match permit {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
let mut write = services().globals.servername_ratelimiter.write().await;
|
||||
let s = Arc::clone(
|
||||
write
|
||||
.entry(origin.to_owned())
|
||||
.or_insert_with(|| Arc::new(Semaphore::new(1))),
|
||||
);
|
||||
let permit = if let Some(p) = permit {
|
||||
p
|
||||
} else {
|
||||
let mut write = services().globals.servername_ratelimiter.write().await;
|
||||
let s = Arc::clone(
|
||||
write
|
||||
.entry(origin.to_owned())
|
||||
.or_insert_with(|| Arc::new(Semaphore::new(1))),
|
||||
);
|
||||
|
||||
s.acquire_owned()
|
||||
}
|
||||
s.acquire_owned()
|
||||
}
|
||||
.await;
|
||||
|
||||
|
|
|
|||
|
|
@ -305,41 +305,38 @@ impl Service {
|
|||
let event: AnySyncEphemeralRoomEvent =
|
||||
serde_json::from_str(read_receipt.json().get())
|
||||
.map_err(|_| Error::bad_database("Invalid edu event in read_receipts."))?;
|
||||
let federation_event = match event {
|
||||
AnySyncEphemeralRoomEvent::Receipt(r) => {
|
||||
let mut read = BTreeMap::new();
|
||||
let federation_event = if let AnySyncEphemeralRoomEvent::Receipt(r) = event {
|
||||
let mut read = BTreeMap::new();
|
||||
|
||||
let (event_id, mut receipt) = r
|
||||
.content
|
||||
.0
|
||||
.into_iter()
|
||||
.next()
|
||||
.expect("we only use one event per read receipt");
|
||||
let receipt = receipt
|
||||
.remove(&ReceiptType::Read)
|
||||
.expect("our read receipts always set this")
|
||||
.remove(&user_id)
|
||||
.expect("our read receipts always have the user here");
|
||||
let (event_id, mut receipt) = r
|
||||
.content
|
||||
.0
|
||||
.into_iter()
|
||||
.next()
|
||||
.expect("we only use one event per read receipt");
|
||||
let receipt = receipt
|
||||
.remove(&ReceiptType::Read)
|
||||
.expect("our read receipts always set this")
|
||||
.remove(&user_id)
|
||||
.expect("our read receipts always have the user here");
|
||||
|
||||
read.insert(
|
||||
user_id,
|
||||
ReceiptData {
|
||||
data: receipt.clone(),
|
||||
event_ids: vec![event_id.clone()],
|
||||
},
|
||||
);
|
||||
read.insert(
|
||||
user_id,
|
||||
ReceiptData {
|
||||
data: receipt.clone(),
|
||||
event_ids: vec![event_id.clone()],
|
||||
},
|
||||
);
|
||||
|
||||
let receipt_map = ReceiptMap { read };
|
||||
let receipt_map = ReceiptMap { read };
|
||||
|
||||
let mut receipts = BTreeMap::new();
|
||||
receipts.insert(room_id.clone(), receipt_map);
|
||||
let mut receipts = BTreeMap::new();
|
||||
receipts.insert(room_id.clone(), receipt_map);
|
||||
|
||||
Edu::Receipt(ReceiptContent { receipts })
|
||||
}
|
||||
_ => {
|
||||
Error::bad_database("Invalid event type in read_receipts");
|
||||
continue;
|
||||
}
|
||||
Edu::Receipt(ReceiptContent { receipts })
|
||||
} else {
|
||||
Error::bad_database("Invalid event type in read_receipts");
|
||||
continue;
|
||||
};
|
||||
|
||||
events.push(serde_json::to_vec(&federation_event).expect("json can be serialized"));
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue