mirror of
https://gitlab.computer.surgery/matrix/grapevine.git
synced 2025-12-17 15:51:23 +01:00
enable if_not_else lint
This commit is contained in:
parent
a32b7c1ac1
commit
623824dc0c
6 changed files with 288 additions and 287 deletions
|
|
@ -35,6 +35,7 @@ flat_map_option = "warn"
|
|||
float_cmp_const = "warn"
|
||||
format_push_string = "warn"
|
||||
get_unwrap = "warn"
|
||||
if_not_else = "warn"
|
||||
if_then_some_else_none = "warn"
|
||||
impl_trait_in_params = "warn"
|
||||
let_underscore_must_use = "warn"
|
||||
|
|
|
|||
|
|
@ -545,11 +545,257 @@ async fn join_room_by_id_helper(
|
|||
let state_lock = mutex_state.lock().await;
|
||||
|
||||
// Ask a remote server if we are not participating in this room
|
||||
if !services()
|
||||
if services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.server_in_room(services().globals.server_name(), room_id)?
|
||||
{
|
||||
info!("We can join locally");
|
||||
|
||||
let join_rules_event = services().rooms.state_accessor.room_state_get(
|
||||
room_id,
|
||||
&StateEventType::RoomJoinRules,
|
||||
"",
|
||||
)?;
|
||||
|
||||
let join_rules_event_content: Option<RoomJoinRulesEventContent> = join_rules_event
|
||||
.as_ref()
|
||||
.map(|join_rules_event| {
|
||||
serde_json::from_str(join_rules_event.content.get()).map_err(|e| {
|
||||
warn!("Invalid join rules event: {}", e);
|
||||
Error::bad_database("Invalid join rules event in db.")
|
||||
})
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
let restriction_rooms = match join_rules_event_content {
|
||||
Some(RoomJoinRulesEventContent {
|
||||
join_rule: JoinRule::Restricted(restricted),
|
||||
})
|
||||
| Some(RoomJoinRulesEventContent {
|
||||
join_rule: JoinRule::KnockRestricted(restricted),
|
||||
}) => restricted
|
||||
.allow
|
||||
.into_iter()
|
||||
.filter_map(|a| match a {
|
||||
AllowRule::RoomMembership(r) => Some(r.room_id),
|
||||
_ => None,
|
||||
})
|
||||
.collect(),
|
||||
_ => Vec::new(),
|
||||
};
|
||||
|
||||
let authorized_user = if restriction_rooms.iter().any(|restriction_room_id| {
|
||||
services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.is_joined(sender_user, restriction_room_id)
|
||||
.unwrap_or(false)
|
||||
}) {
|
||||
let mut auth_user = None;
|
||||
for user in services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.room_members(room_id)
|
||||
.filter_map(Result::ok)
|
||||
.collect::<Vec<_>>()
|
||||
{
|
||||
if user.server_name() == services().globals.server_name()
|
||||
&& services()
|
||||
.rooms
|
||||
.state_accessor
|
||||
.user_can_invite(room_id, &user, sender_user, &state_lock)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
{
|
||||
auth_user = Some(user);
|
||||
break;
|
||||
}
|
||||
}
|
||||
auth_user
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let event = RoomMemberEventContent {
|
||||
membership: MembershipState::Join,
|
||||
displayname: services().users.displayname(sender_user)?,
|
||||
avatar_url: services().users.avatar_url(sender_user)?,
|
||||
is_direct: None,
|
||||
third_party_invite: None,
|
||||
blurhash: services().users.blurhash(sender_user)?,
|
||||
reason: reason.clone(),
|
||||
join_authorized_via_users_server: authorized_user,
|
||||
};
|
||||
|
||||
// Try normal join first
|
||||
let error = match services()
|
||||
.rooms
|
||||
.timeline
|
||||
.build_and_append_pdu(
|
||||
PduBuilder {
|
||||
event_type: TimelineEventType::RoomMember,
|
||||
content: to_raw_value(&event).expect("event is valid, we just created it"),
|
||||
unsigned: None,
|
||||
state_key: Some(sender_user.to_string()),
|
||||
redacts: None,
|
||||
},
|
||||
sender_user,
|
||||
room_id,
|
||||
&state_lock,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_event_id) => return Ok(join_room_by_id::v3::Response::new(room_id.to_owned())),
|
||||
Err(e) => e,
|
||||
};
|
||||
|
||||
if restriction_rooms.is_empty()
|
||||
&& servers
|
||||
.iter()
|
||||
.any(|s| *s != services().globals.server_name())
|
||||
{
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
info!(
|
||||
"We couldn't do the join locally, maybe federation can help to satisfy the restricted join requirements"
|
||||
);
|
||||
let (make_join_response, remote_server) =
|
||||
make_join_request(sender_user, room_id, servers).await?;
|
||||
|
||||
let room_version_id = match make_join_response.room_version {
|
||||
Some(room_version_id)
|
||||
if services()
|
||||
.globals
|
||||
.supported_room_versions()
|
||||
.contains(&room_version_id) =>
|
||||
{
|
||||
room_version_id
|
||||
}
|
||||
_ => return Err(Error::BadServerResponse("Room version is not supported")),
|
||||
};
|
||||
let mut join_event_stub: CanonicalJsonObject =
|
||||
serde_json::from_str(make_join_response.event.get()).map_err(|_| {
|
||||
Error::BadServerResponse("Invalid make_join event json received from server.")
|
||||
})?;
|
||||
let join_authorized_via_users_server = join_event_stub
|
||||
.get("content")
|
||||
.map(|s| {
|
||||
s.as_object()?
|
||||
.get("join_authorised_via_users_server")?
|
||||
.as_str()
|
||||
})
|
||||
.and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok());
|
||||
// TODO: Is origin needed?
|
||||
join_event_stub.insert(
|
||||
"origin".to_owned(),
|
||||
CanonicalJsonValue::String(services().globals.server_name().as_str().to_owned()),
|
||||
);
|
||||
join_event_stub.insert(
|
||||
"origin_server_ts".to_owned(),
|
||||
CanonicalJsonValue::Integer(
|
||||
utils::millis_since_unix_epoch()
|
||||
.try_into()
|
||||
.expect("Timestamp is valid js_int value"),
|
||||
),
|
||||
);
|
||||
join_event_stub.insert(
|
||||
"content".to_owned(),
|
||||
to_canonical_value(RoomMemberEventContent {
|
||||
membership: MembershipState::Join,
|
||||
displayname: services().users.displayname(sender_user)?,
|
||||
avatar_url: services().users.avatar_url(sender_user)?,
|
||||
is_direct: None,
|
||||
third_party_invite: None,
|
||||
blurhash: services().users.blurhash(sender_user)?,
|
||||
reason,
|
||||
join_authorized_via_users_server,
|
||||
})
|
||||
.expect("event is valid, we just created it"),
|
||||
);
|
||||
|
||||
// We don't leave the event id in the pdu because that's only allowed in v1 or v2 rooms
|
||||
join_event_stub.remove("event_id");
|
||||
|
||||
// In order to create a compatible ref hash (EventID) the `hashes` field needs to be present
|
||||
ruma::signatures::hash_and_sign_event(
|
||||
services().globals.server_name().as_str(),
|
||||
services().globals.keypair(),
|
||||
&mut join_event_stub,
|
||||
&room_version_id,
|
||||
)
|
||||
.expect("event is valid, we just created it");
|
||||
|
||||
// Generate event id
|
||||
let event_id = format!(
|
||||
"${}",
|
||||
ruma::signatures::reference_hash(&join_event_stub, &room_version_id)
|
||||
.expect("ruma can calculate reference hashes")
|
||||
);
|
||||
let event_id = <&EventId>::try_from(event_id.as_str())
|
||||
.expect("ruma's reference hashes are valid event ids");
|
||||
|
||||
// Add event_id back
|
||||
join_event_stub.insert(
|
||||
"event_id".to_owned(),
|
||||
CanonicalJsonValue::String(event_id.as_str().to_owned()),
|
||||
);
|
||||
|
||||
// It has enough fields to be called a proper event now
|
||||
let join_event = join_event_stub;
|
||||
|
||||
let send_join_response = services()
|
||||
.sending
|
||||
.send_federation_request(
|
||||
&remote_server,
|
||||
federation::membership::create_join_event::v2::Request {
|
||||
room_id: room_id.to_owned(),
|
||||
event_id: event_id.to_owned(),
|
||||
pdu: PduEvent::convert_to_outgoing_federation_event(join_event.clone()),
|
||||
omit_members: false,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let Some(signed_raw) = send_join_response.room_state.event else {
|
||||
return Err(error);
|
||||
};
|
||||
|
||||
let (signed_event_id, signed_value) =
|
||||
match gen_event_id_canonical_json(&signed_raw, &room_version_id) {
|
||||
Ok(t) => t,
|
||||
Err(_) => {
|
||||
// Event could not be converted to canonical json
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
"Could not convert event to canonical json.",
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
if signed_event_id != event_id {
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
"Server sent event with wrong event id",
|
||||
));
|
||||
}
|
||||
|
||||
drop(state_lock);
|
||||
let pub_key_map = RwLock::new(BTreeMap::new());
|
||||
services()
|
||||
.rooms
|
||||
.event_handler
|
||||
.handle_incoming_pdu(
|
||||
&remote_server,
|
||||
&signed_event_id,
|
||||
room_id,
|
||||
signed_value,
|
||||
true,
|
||||
&pub_key_map,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
info!("Joining {room_id} over federation.");
|
||||
|
||||
let (make_join_response, remote_server) =
|
||||
|
|
@ -851,252 +1097,6 @@ async fn join_room_by_id_helper(
|
|||
.rooms
|
||||
.state
|
||||
.set_room_state(room_id, statehash_after_join, &state_lock)?;
|
||||
} else {
|
||||
info!("We can join locally");
|
||||
|
||||
let join_rules_event = services().rooms.state_accessor.room_state_get(
|
||||
room_id,
|
||||
&StateEventType::RoomJoinRules,
|
||||
"",
|
||||
)?;
|
||||
|
||||
let join_rules_event_content: Option<RoomJoinRulesEventContent> = join_rules_event
|
||||
.as_ref()
|
||||
.map(|join_rules_event| {
|
||||
serde_json::from_str(join_rules_event.content.get()).map_err(|e| {
|
||||
warn!("Invalid join rules event: {}", e);
|
||||
Error::bad_database("Invalid join rules event in db.")
|
||||
})
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
let restriction_rooms = match join_rules_event_content {
|
||||
Some(RoomJoinRulesEventContent {
|
||||
join_rule: JoinRule::Restricted(restricted),
|
||||
})
|
||||
| Some(RoomJoinRulesEventContent {
|
||||
join_rule: JoinRule::KnockRestricted(restricted),
|
||||
}) => restricted
|
||||
.allow
|
||||
.into_iter()
|
||||
.filter_map(|a| match a {
|
||||
AllowRule::RoomMembership(r) => Some(r.room_id),
|
||||
_ => None,
|
||||
})
|
||||
.collect(),
|
||||
_ => Vec::new(),
|
||||
};
|
||||
|
||||
let authorized_user = if restriction_rooms.iter().any(|restriction_room_id| {
|
||||
services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.is_joined(sender_user, restriction_room_id)
|
||||
.unwrap_or(false)
|
||||
}) {
|
||||
let mut auth_user = None;
|
||||
for user in services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.room_members(room_id)
|
||||
.filter_map(Result::ok)
|
||||
.collect::<Vec<_>>()
|
||||
{
|
||||
if user.server_name() == services().globals.server_name()
|
||||
&& services()
|
||||
.rooms
|
||||
.state_accessor
|
||||
.user_can_invite(room_id, &user, sender_user, &state_lock)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
{
|
||||
auth_user = Some(user);
|
||||
break;
|
||||
}
|
||||
}
|
||||
auth_user
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let event = RoomMemberEventContent {
|
||||
membership: MembershipState::Join,
|
||||
displayname: services().users.displayname(sender_user)?,
|
||||
avatar_url: services().users.avatar_url(sender_user)?,
|
||||
is_direct: None,
|
||||
third_party_invite: None,
|
||||
blurhash: services().users.blurhash(sender_user)?,
|
||||
reason: reason.clone(),
|
||||
join_authorized_via_users_server: authorized_user,
|
||||
};
|
||||
|
||||
// Try normal join first
|
||||
let error = match services()
|
||||
.rooms
|
||||
.timeline
|
||||
.build_and_append_pdu(
|
||||
PduBuilder {
|
||||
event_type: TimelineEventType::RoomMember,
|
||||
content: to_raw_value(&event).expect("event is valid, we just created it"),
|
||||
unsigned: None,
|
||||
state_key: Some(sender_user.to_string()),
|
||||
redacts: None,
|
||||
},
|
||||
sender_user,
|
||||
room_id,
|
||||
&state_lock,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_event_id) => return Ok(join_room_by_id::v3::Response::new(room_id.to_owned())),
|
||||
Err(e) => e,
|
||||
};
|
||||
|
||||
if !restriction_rooms.is_empty()
|
||||
&& servers
|
||||
.iter()
|
||||
.any(|s| *s != services().globals.server_name())
|
||||
{
|
||||
info!(
|
||||
"We couldn't do the join locally, maybe federation can help to satisfy the restricted join requirements"
|
||||
);
|
||||
let (make_join_response, remote_server) =
|
||||
make_join_request(sender_user, room_id, servers).await?;
|
||||
|
||||
let room_version_id = match make_join_response.room_version {
|
||||
Some(room_version_id)
|
||||
if services()
|
||||
.globals
|
||||
.supported_room_versions()
|
||||
.contains(&room_version_id) =>
|
||||
{
|
||||
room_version_id
|
||||
}
|
||||
_ => return Err(Error::BadServerResponse("Room version is not supported")),
|
||||
};
|
||||
let mut join_event_stub: CanonicalJsonObject =
|
||||
serde_json::from_str(make_join_response.event.get()).map_err(|_| {
|
||||
Error::BadServerResponse("Invalid make_join event json received from server.")
|
||||
})?;
|
||||
let join_authorized_via_users_server = join_event_stub
|
||||
.get("content")
|
||||
.map(|s| {
|
||||
s.as_object()?
|
||||
.get("join_authorised_via_users_server")?
|
||||
.as_str()
|
||||
})
|
||||
.and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok());
|
||||
// TODO: Is origin needed?
|
||||
join_event_stub.insert(
|
||||
"origin".to_owned(),
|
||||
CanonicalJsonValue::String(services().globals.server_name().as_str().to_owned()),
|
||||
);
|
||||
join_event_stub.insert(
|
||||
"origin_server_ts".to_owned(),
|
||||
CanonicalJsonValue::Integer(
|
||||
utils::millis_since_unix_epoch()
|
||||
.try_into()
|
||||
.expect("Timestamp is valid js_int value"),
|
||||
),
|
||||
);
|
||||
join_event_stub.insert(
|
||||
"content".to_owned(),
|
||||
to_canonical_value(RoomMemberEventContent {
|
||||
membership: MembershipState::Join,
|
||||
displayname: services().users.displayname(sender_user)?,
|
||||
avatar_url: services().users.avatar_url(sender_user)?,
|
||||
is_direct: None,
|
||||
third_party_invite: None,
|
||||
blurhash: services().users.blurhash(sender_user)?,
|
||||
reason,
|
||||
join_authorized_via_users_server,
|
||||
})
|
||||
.expect("event is valid, we just created it"),
|
||||
);
|
||||
|
||||
// We don't leave the event id in the pdu because that's only allowed in v1 or v2 rooms
|
||||
join_event_stub.remove("event_id");
|
||||
|
||||
// In order to create a compatible ref hash (EventID) the `hashes` field needs to be present
|
||||
ruma::signatures::hash_and_sign_event(
|
||||
services().globals.server_name().as_str(),
|
||||
services().globals.keypair(),
|
||||
&mut join_event_stub,
|
||||
&room_version_id,
|
||||
)
|
||||
.expect("event is valid, we just created it");
|
||||
|
||||
// Generate event id
|
||||
let event_id = format!(
|
||||
"${}",
|
||||
ruma::signatures::reference_hash(&join_event_stub, &room_version_id)
|
||||
.expect("ruma can calculate reference hashes")
|
||||
);
|
||||
let event_id = <&EventId>::try_from(event_id.as_str())
|
||||
.expect("ruma's reference hashes are valid event ids");
|
||||
|
||||
// Add event_id back
|
||||
join_event_stub.insert(
|
||||
"event_id".to_owned(),
|
||||
CanonicalJsonValue::String(event_id.as_str().to_owned()),
|
||||
);
|
||||
|
||||
// It has enough fields to be called a proper event now
|
||||
let join_event = join_event_stub;
|
||||
|
||||
let send_join_response = services()
|
||||
.sending
|
||||
.send_federation_request(
|
||||
&remote_server,
|
||||
federation::membership::create_join_event::v2::Request {
|
||||
room_id: room_id.to_owned(),
|
||||
event_id: event_id.to_owned(),
|
||||
pdu: PduEvent::convert_to_outgoing_federation_event(join_event.clone()),
|
||||
omit_members: false,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(signed_raw) = send_join_response.room_state.event {
|
||||
let (signed_event_id, signed_value) =
|
||||
match gen_event_id_canonical_json(&signed_raw, &room_version_id) {
|
||||
Ok(t) => t,
|
||||
Err(_) => {
|
||||
// Event could not be converted to canonical json
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
"Could not convert event to canonical json.",
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
if signed_event_id != event_id {
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
"Server sent event with wrong event id",
|
||||
));
|
||||
}
|
||||
|
||||
drop(state_lock);
|
||||
let pub_key_map = RwLock::new(BTreeMap::new());
|
||||
services()
|
||||
.rooms
|
||||
.event_handler
|
||||
.handle_incoming_pdu(
|
||||
&remote_server,
|
||||
&signed_event_id,
|
||||
room_id,
|
||||
signed_value,
|
||||
true,
|
||||
&pub_key_map,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
return Err(error);
|
||||
}
|
||||
} else {
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(join_room_by_id::v3::Response::new(room_id.to_owned()))
|
||||
|
|
@ -1414,35 +1414,11 @@ pub(crate) async fn leave_room(
|
|||
reason: Option<String>,
|
||||
) -> Result<()> {
|
||||
// Ask a remote server if we don't have this room
|
||||
if !services()
|
||||
if services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.server_in_room(services().globals.server_name(), room_id)?
|
||||
{
|
||||
if let Err(e) = remote_leave_room(user_id, room_id).await {
|
||||
warn!("Failed to leave room {} remotely: {}", user_id, e);
|
||||
// Don't tell the client about this error
|
||||
}
|
||||
|
||||
let last_state = services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.invite_state(user_id, room_id)?
|
||||
.map_or_else(
|
||||
|| services().rooms.state_cache.left_state(user_id, room_id),
|
||||
|s| Ok(Some(s)),
|
||||
)?;
|
||||
|
||||
// We always drop the invite, we can't rely on other servers
|
||||
services().rooms.state_cache.update_membership(
|
||||
room_id,
|
||||
user_id,
|
||||
MembershipState::Leave,
|
||||
user_id,
|
||||
last_state,
|
||||
true,
|
||||
)?;
|
||||
} else {
|
||||
let mutex_state = Arc::clone(
|
||||
services()
|
||||
.globals
|
||||
|
|
@ -1501,6 +1477,30 @@ pub(crate) async fn leave_room(
|
|||
&state_lock,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
if let Err(e) = remote_leave_room(user_id, room_id).await {
|
||||
warn!("Failed to leave room {} remotely: {}", user_id, e);
|
||||
// Don't tell the client about this error
|
||||
}
|
||||
|
||||
let last_state = services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.invite_state(user_id, room_id)?
|
||||
.map_or_else(
|
||||
|| services().rooms.state_cache.left_state(user_id, room_id),
|
||||
|s| Ok(Some(s)),
|
||||
)?;
|
||||
|
||||
// We always drop the invite, we can't rely on other servers
|
||||
services().rooms.state_cache.update_membership(
|
||||
room_id,
|
||||
user_id,
|
||||
MembershipState::Leave,
|
||||
user_id,
|
||||
last_state,
|
||||
true,
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
|
|||
|
|
@ -94,7 +94,9 @@ pub(crate) async fn sync_events_route(
|
|||
rx
|
||||
}
|
||||
Entry::Occupied(mut o) => {
|
||||
if o.get().0 != body.since {
|
||||
if o.get().0 == body.since {
|
||||
o.get().1.clone()
|
||||
} else {
|
||||
let (tx, rx) = tokio::sync::watch::channel(None);
|
||||
|
||||
o.insert((body.since.clone(), rx.clone()));
|
||||
|
|
@ -109,8 +111,6 @@ pub(crate) async fn sync_events_route(
|
|||
));
|
||||
|
||||
rx
|
||||
} else {
|
||||
o.get().1.clone()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
|||
|
|
@ -17,7 +17,12 @@ pub(crate) async fn turn_server_route(
|
|||
|
||||
let turn_secret = services().globals.turn_secret().clone();
|
||||
|
||||
let (username, password) = if !turn_secret.is_empty() {
|
||||
let (username, password) = if turn_secret.is_empty() {
|
||||
(
|
||||
services().globals.turn_username().clone(),
|
||||
services().globals.turn_password().clone(),
|
||||
)
|
||||
} else {
|
||||
let expiry = SecondsSinceUnixEpoch::from_system_time(
|
||||
SystemTime::now() + Duration::from_secs(services().globals.turn_ttl()),
|
||||
)
|
||||
|
|
@ -32,11 +37,6 @@ pub(crate) async fn turn_server_route(
|
|||
let password: String = general_purpose::STANDARD.encode(mac.finalize().into_bytes());
|
||||
|
||||
(username, password)
|
||||
} else {
|
||||
(
|
||||
services().globals.turn_username().clone(),
|
||||
services().globals.turn_password().clone(),
|
||||
)
|
||||
};
|
||||
|
||||
Ok(get_turn_server_info::v3::Response {
|
||||
|
|
|
|||
|
|
@ -66,10 +66,10 @@ impl Service {
|
|||
while stack.last().map_or(false, |s| s.is_empty()) {
|
||||
stack.pop();
|
||||
}
|
||||
if !stack.is_empty() {
|
||||
stack.last_mut().and_then(|s| s.pop())
|
||||
} else {
|
||||
if stack.is_empty() {
|
||||
None
|
||||
} else {
|
||||
stack.last_mut().and_then(|s| s.pop())
|
||||
}
|
||||
} {
|
||||
rooms_in_path.push(current_room.clone());
|
||||
|
|
|
|||
|
|
@ -160,7 +160,9 @@ impl Service {
|
|||
// Find events that have been added since starting the last request
|
||||
let new_events = self.db.queued_requests(&outgoing_kind).filter_map(|r| r.ok()).take(30).collect::<Vec<_>>();
|
||||
|
||||
if !new_events.is_empty() {
|
||||
if new_events.is_empty() {
|
||||
current_transaction_status.remove(&outgoing_kind);
|
||||
} else {
|
||||
// Insert pdus we found
|
||||
self.db.mark_as_active(&new_events)?;
|
||||
|
||||
|
|
@ -170,8 +172,6 @@ impl Service {
|
|||
new_events.into_iter().map(|(event, _)| event).collect(),
|
||||
)
|
||||
);
|
||||
} else {
|
||||
current_transaction_status.remove(&outgoing_kind);
|
||||
}
|
||||
}
|
||||
Err((outgoing_kind, _)) => {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue