handle cancellation in complement wrapper

What a mess lmao
This commit is contained in:
Benjamin Lee 2024-06-22 00:12:23 -07:00
parent f76806655f
commit 0eee282558
No known key found for this signature in database
GPG key ID: FB9624E2885D55A4
4 changed files with 236 additions and 12 deletions

44
Cargo.lock generated
View file

@ -391,6 +391,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "cfg_aliases"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e"
[[package]] [[package]]
name = "cfg_aliases" name = "cfg_aliases"
version = "0.2.1" version = "0.2.1"
@ -845,7 +851,7 @@ dependencies = [
"image", "image",
"jsonwebtoken", "jsonwebtoken",
"lru-cache", "lru-cache",
"nix", "nix 0.29.0",
"num_cpus", "num_cpus",
"once_cell", "once_cell",
"opentelemetry", "opentelemetry",
@ -1523,6 +1529,18 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "nix"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4"
dependencies = [
"bitflags 2.6.0",
"cfg-if",
"cfg_aliases 0.1.1",
"libc",
]
[[package]] [[package]]
name = "nix" name = "nix"
version = "0.29.0" version = "0.29.0"
@ -1531,7 +1549,7 @@ checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46"
dependencies = [ dependencies = [
"bitflags 2.6.0", "bitflags 2.6.0",
"cfg-if", "cfg-if",
"cfg_aliases", "cfg_aliases 0.2.1",
"libc", "libc",
] ]
@ -1917,6 +1935,16 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "process-wrap"
version = "8.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38ee68ae331824036479c84060534b18254c864fa73366c58d86db3b7b811619"
dependencies = [
"indexmap 2.5.0",
"nix 0.28.0",
]
[[package]] [[package]]
name = "prometheus" name = "prometheus"
version = "0.13.4" version = "0.13.4"
@ -2741,6 +2769,16 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801"
dependencies = [
"libc",
"signal-hook-registry",
]
[[package]] [[package]]
name = "signal-hook-registry" name = "signal-hook-registry"
version = "1.4.2" version = "1.4.2"
@ -3867,8 +3905,10 @@ dependencies = [
"clap", "clap",
"indicatif", "indicatif",
"miette", "miette",
"process-wrap",
"serde", "serde",
"serde_json", "serde_json",
"signal-hook",
"strum", "strum",
"xshell", "xshell",
] ]

View file

@ -111,6 +111,7 @@ opentelemetry-prometheus = "0.17.0"
opentelemetry_sdk = { version = "0.24.0", features = ["rt-tokio"] } opentelemetry_sdk = { version = "0.24.0", features = ["rt-tokio"] }
parking_lot = "0.12.3" parking_lot = "0.12.3"
phf = { version = "0.11.2", features = ["macros"] } phf = { version = "0.11.2", features = ["macros"] }
process-wrap = { version = "8.0.2", default-features = false, features = ["std", "process-group"] }
prometheus = "0.13.4" prometheus = "0.13.4"
rand = "0.8.5" rand = "0.8.5"
regex = "1.10.6" regex = "1.10.6"
@ -126,6 +127,7 @@ serde_html_form = "0.2.6"
serde_json = { version = "1.0.128", features = ["raw_value"] } serde_json = { version = "1.0.128", features = ["raw_value"] }
serde_yaml = "0.9.34" serde_yaml = "0.9.34"
sha-1 = "0.10.1" sha-1 = "0.10.1"
signal-hook = "0.3.17"
strum = { version = "0.26.3", features = ["derive"] } strum = { version = "0.26.3", features = ["derive"] }
thiserror = "1.0.64" thiserror = "1.0.64"
thread_local = "1.1.8" thread_local = "1.1.8"

View file

@ -8,9 +8,11 @@ rust-version.workspace = true
[dependencies] [dependencies]
clap.workspace = true clap.workspace = true
miette.workspace = true miette.workspace = true
process-wrap.workspace = true
indicatif.workspace = true indicatif.workspace = true
serde.workspace = true serde.workspace = true
serde_json.workspace = true serde_json.workspace = true
signal-hook.workspace = true
strum.workspace = true strum.workspace = true
xshell.workspace = true xshell.workspace = true

View file

@ -6,14 +6,26 @@ use std::{
collections::BTreeMap, collections::BTreeMap,
fs::{self, File}, fs::{self, File},
io::{BufRead, BufReader, BufWriter, Seek, SeekFrom, Write}, io::{BufRead, BufReader, BufWriter, Seek, SeekFrom, Write},
mem, panic,
path::{Path, PathBuf}, path::{Path, PathBuf},
process::{Command, Stdio}, process::{Command, Stdio},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
thread,
time::Duration, time::Duration,
}; };
use indicatif::{ProgressBar, ProgressStyle}; use indicatif::{ProgressBar, ProgressStyle};
use miette::{miette, IntoDiagnostic, LabeledSpan, Result, WrapErr}; use miette::{miette, IntoDiagnostic, LabeledSpan, Result, WrapErr};
use process_wrap::std::{ProcessGroup, StdChildWrapper, StdCommandWrap};
use serde::Deserialize; use serde::Deserialize;
use signal_hook::{
consts::signal::{SIGINT, SIGQUIT, SIGTERM},
flag,
iterator::Signals,
};
use strum::{Display, EnumString}; use strum::{Display, EnumString};
use xshell::{cmd, Shell}; use xshell::{cmd, Shell};
@ -37,27 +49,195 @@ pub(crate) fn count_complement_tests(
Ok(test_count) Ok(test_count)
} }
/// Runs complement test suite /// Run complement tests.
///
/// This function mostly deals with handling shutdown signals, while the actual
/// logic for running complement is in `run_complement_inner`, which is spawned
/// as a separate thread. This is necessary because the go `test2json` tool
/// ignores SIGTERM and SIGINT. Without signal handling on our end, terminating
/// the complement wrapper process would leave a dangling complement child
/// process running.
///
/// The reason that `test2json` does this is that it does not implement any kind
/// of test cleanup, and so the developers decided that ignoring termination
/// signals entirely was safer. Running go unit tests outside of `test2json`
/// (and so without machine-readable output) does not have this limitation.
/// Unfortunately neither of these are an option for us. We need
/// machine-readable output to compare against the baseline result. Complement
/// runs can take 40+ minutes, so being able to cancel them is a requirement.
///
/// Because we don't trigger any of the normal cleanup, we need to handle
/// dangling docker containers ourselves.
pub(crate) fn run_complement( pub(crate) fn run_complement(
sh: &Shell, sh: &Shell,
out: &Path, out: &Path,
docker_image: &str, docker_image: &str,
test_count: u64, test_count: u64,
) -> Result<TestResults> { ) -> Result<TestResults> {
// TODO: handle SIG{INT,TERM} let term_signals = [SIGTERM, SIGINT, SIGQUIT];
let term_now = Arc::new(AtomicBool::new(false));
for sig in &term_signals {
// Terminate immediately if `term_now` is true and we receive a
// terminating signal
flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now))
.into_diagnostic()
.wrap_err("error registering signal handler")?;
}
let mut signals = Signals::new(term_signals).unwrap();
let state = Mutex::new(ComplementRunnerState::Startup);
let signals_handle = signals.handle();
let result = thread::scope(|s| {
let state_ref = &state;
let cloned_sh = sh.clone();
let thread_handle = s.spawn(move || {
let panic_result = panic::catch_unwind(|| {
run_complement_inner(
&cloned_sh,
out,
docker_image,
test_count,
state_ref,
)
});
// Stop the signal-handling loop, even if we panicked
signals_handle.close();
match panic_result {
Ok(result) => result,
Err(panic) => panic::resume_unwind(panic),
}
});
let canceled = if let Some(signal) = signals.forever().next() {
let description = match signal {
SIGTERM => "SIGTERM",
SIGINT => "ctrl+c",
SIGQUIT => "SIGQUIT",
_ => unreachable!(),
};
eprintln!(
"Received {description}, stopping complement run. Send \
{description} a second time to terminate without cleaning \
up, which may leave dangling processes and docker containers"
);
term_now.store(true, Ordering::Relaxed);
{
let mut state = state.lock().unwrap();
let old_state =
mem::replace(&mut *state, ComplementRunnerState::Shutdown);
match old_state {
ComplementRunnerState::Startup => (),
ComplementRunnerState::Shutdown => unreachable!(),
ComplementRunnerState::Running(mut child) => {
// Killing the child process should terminate the
// complement runner thread in a
// bounded amount of time, because it will cause the
// stdout reader to return EOF.
child.kill().unwrap();
}
}
}
// TODO: kill dangling docker containers
eprintln!(
"WARNING: complement may have left dangling docker \
containers. Cleanup for these is planned, but has not been \
implemented yet. You need to identify and kill them manually"
);
true
} else {
// hit this branch if the signal handler is closed by the complement
// runner thread. This means the complement run finished
// without being canceled.
false
};
match thread_handle.join() {
Ok(result) => {
if canceled {
Err(miette!("complement run was canceled"))
} else {
result
}
}
Err(panic_value) => panic::resume_unwind(panic_value),
}
});
// From this point on, terminate immediately when signalled
term_now.store(true, Ordering::Relaxed);
result
}
/// Possible states for the complement runner thread.
///
/// The current state should be protected by a mutex, where state changes are
/// only performed while the mutex is locked. This is to prevent a race
/// condition where the main thread handles a shutdown signal at the same time
/// that the complement runner thread is starting the child process, and so the
/// main thread fails to kill the child process.
///
/// Valid state transitions:
///
/// - `Startup` -> `Running`
/// - `Startup` -> `Shutdown`
/// - `Running` -> `Shutdown`
#[derive(Debug)]
enum ComplementRunnerState {
/// The complement child process has not been started yet
Startup,
/// The complement child process is running, and we have not yet received
/// a shutdown signal.
Running(Box<dyn StdChildWrapper>),
/// We have received a shutdown signal.
Shutdown,
}
/// Spawn complement chind process and handle it's output
///
/// This is the "complement runner" thread, spawned by the [`run_complement`]
/// function.
fn run_complement_inner(
sh: &Shell,
out: &Path,
docker_image: &str,
test_count: u64,
state: &Mutex<ComplementRunnerState>,
) -> Result<TestResults> {
let cmd = cmd!(sh, "go tool test2json complement.test -test.v=test2json") let cmd = cmd!(sh, "go tool test2json complement.test -test.v=test2json")
.env("COMPLEMENT_BASE_IMAGE", docker_image) .env("COMPLEMENT_BASE_IMAGE", docker_image)
.env("COMPLEMENT_SPAWN_HS_TIMEOUT", "5") .env("COMPLEMENT_SPAWN_HS_TIMEOUT", "5")
.env("COMPLEMENT_ALWAYS_PRINT_SERVER_LOGS", "1"); .env("COMPLEMENT_ALWAYS_PRINT_SERVER_LOGS", "1");
eprintln!("$ {cmd}"); eprintln!("$ {cmd}");
let child = Command::from(cmd)
.stdout(Stdio::piped()) let stdout = {
.spawn() let mut state = state.lock().unwrap();
.into_diagnostic() match &*state {
.wrap_err("error spawning complement process")?; ComplementRunnerState::Startup => (),
let stdout = child ComplementRunnerState::Running(_) => unreachable!(),
.stdout ComplementRunnerState::Shutdown => {
.expect("child process spawned with piped stdout should have stdout"); return Err(miette!("complement run was canceled"))
}
}
let mut cmd = Command::from(cmd);
cmd.stdout(Stdio::piped());
let mut child = StdCommandWrap::from(cmd)
.wrap(ProcessGroup::leader())
.spawn()
.into_diagnostic()
.wrap_err("error spawning complement process")?;
let stdout = child.stdout().take().expect(
"child process spawned with piped stdout should have stdout",
);
*state = ComplementRunnerState::Running(child);
stdout
};
let lines = BufReader::new(stdout).lines(); let lines = BufReader::new(stdout).lines();
let mut ctx = TestContext::new(out, test_count)?; let mut ctx = TestContext::new(out, test_count)?;