diff --git a/Cargo.lock b/Cargo.lock index 8ce87cc6..a4c3a2dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -391,6 +391,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" + [[package]] name = "cfg_aliases" version = "0.2.1" @@ -845,7 +851,7 @@ dependencies = [ "image", "jsonwebtoken", "lru-cache", - "nix", + "nix 0.29.0", "num_cpus", "once_cell", "opentelemetry", @@ -1523,6 +1529,18 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "nix" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "cfg_aliases 0.1.1", + "libc", +] + [[package]] name = "nix" version = "0.29.0" @@ -1531,7 +1549,7 @@ checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ "bitflags 2.6.0", "cfg-if", - "cfg_aliases", + "cfg_aliases 0.2.1", "libc", ] @@ -1917,6 +1935,16 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "process-wrap" +version = "8.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38ee68ae331824036479c84060534b18254c864fa73366c58d86db3b7b811619" +dependencies = [ + "indexmap 2.5.0", + "nix 0.28.0", +] + [[package]] name = "prometheus" version = "0.13.4" @@ -2741,6 +2769,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801" +dependencies = [ + "libc", + "signal-hook-registry", +] + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -3867,8 +3905,10 @@ dependencies = [ "clap", "indicatif", "miette", + "process-wrap", "serde", "serde_json", + "signal-hook", "strum", "xshell", ] diff --git a/Cargo.toml b/Cargo.toml index 9bfe1f7c..aae44a47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,6 +111,7 @@ opentelemetry-prometheus = "0.17.0" opentelemetry_sdk = { version = "0.24.0", features = ["rt-tokio"] } parking_lot = "0.12.3" phf = { version = "0.11.2", features = ["macros"] } +process-wrap = { version = "8.0.2", default-features = false, features = ["std", "process-group"] } prometheus = "0.13.4" rand = "0.8.5" regex = "1.10.6" @@ -126,6 +127,7 @@ serde_html_form = "0.2.6" serde_json = { version = "1.0.128", features = ["raw_value"] } serde_yaml = "0.9.34" sha-1 = "0.10.1" +signal-hook = "0.3.17" strum = { version = "0.26.3", features = ["derive"] } thiserror = "1.0.64" thread_local = "1.1.8" diff --git a/xtask/Cargo.toml b/xtask/Cargo.toml index 511e054f..85f2f210 100644 --- a/xtask/Cargo.toml +++ b/xtask/Cargo.toml @@ -8,9 +8,11 @@ rust-version.workspace = true [dependencies] clap.workspace = true miette.workspace = true +process-wrap.workspace = true indicatif.workspace = true serde.workspace = true serde_json.workspace = true +signal-hook.workspace = true strum.workspace = true xshell.workspace = true diff --git a/xtask/src/complement/test2json.rs b/xtask/src/complement/test2json.rs index 217a2a4f..a49a38ed 100644 --- a/xtask/src/complement/test2json.rs +++ b/xtask/src/complement/test2json.rs @@ -6,14 +6,26 @@ use std::{ collections::BTreeMap, fs::{self, File}, io::{BufRead, BufReader, BufWriter, Seek, SeekFrom, Write}, + mem, panic, path::{Path, PathBuf}, process::{Command, Stdio}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, + thread, time::Duration, }; use indicatif::{ProgressBar, ProgressStyle}; use miette::{miette, IntoDiagnostic, LabeledSpan, Result, WrapErr}; +use process_wrap::std::{ProcessGroup, StdChildWrapper, StdCommandWrap}; use serde::Deserialize; +use signal_hook::{ + consts::signal::{SIGINT, SIGQUIT, SIGTERM}, + flag, + iterator::Signals, +}; use strum::{Display, EnumString}; use xshell::{cmd, Shell}; @@ -37,27 +49,195 @@ pub(crate) fn count_complement_tests( Ok(test_count) } -/// Runs complement test suite +/// Run complement tests. +/// +/// This function mostly deals with handling shutdown signals, while the actual +/// logic for running complement is in `run_complement_inner`, which is spawned +/// as a separate thread. This is necessary because the go `test2json` tool +/// ignores SIGTERM and SIGINT. Without signal handling on our end, terminating +/// the complement wrapper process would leave a dangling complement child +/// process running. +/// +/// The reason that `test2json` does this is that it does not implement any kind +/// of test cleanup, and so the developers decided that ignoring termination +/// signals entirely was safer. Running go unit tests outside of `test2json` +/// (and so without machine-readable output) does not have this limitation. +/// Unfortunately neither of these are an option for us. We need +/// machine-readable output to compare against the baseline result. Complement +/// runs can take 40+ minutes, so being able to cancel them is a requirement. +/// +/// Because we don't trigger any of the normal cleanup, we need to handle +/// dangling docker containers ourselves. pub(crate) fn run_complement( sh: &Shell, out: &Path, docker_image: &str, test_count: u64, ) -> Result { - // TODO: handle SIG{INT,TERM} + let term_signals = [SIGTERM, SIGINT, SIGQUIT]; + + let term_now = Arc::new(AtomicBool::new(false)); + for sig in &term_signals { + // Terminate immediately if `term_now` is true and we receive a + // terminating signal + flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now)) + .into_diagnostic() + .wrap_err("error registering signal handler")?; + } + + let mut signals = Signals::new(term_signals).unwrap(); + + let state = Mutex::new(ComplementRunnerState::Startup); + let signals_handle = signals.handle(); + + let result = thread::scope(|s| { + let state_ref = &state; + let cloned_sh = sh.clone(); + let thread_handle = s.spawn(move || { + let panic_result = panic::catch_unwind(|| { + run_complement_inner( + &cloned_sh, + out, + docker_image, + test_count, + state_ref, + ) + }); + // Stop the signal-handling loop, even if we panicked + signals_handle.close(); + match panic_result { + Ok(result) => result, + Err(panic) => panic::resume_unwind(panic), + } + }); + + let canceled = if let Some(signal) = signals.forever().next() { + let description = match signal { + SIGTERM => "SIGTERM", + SIGINT => "ctrl+c", + SIGQUIT => "SIGQUIT", + _ => unreachable!(), + }; + eprintln!( + "Received {description}, stopping complement run. Send \ + {description} a second time to terminate without cleaning \ + up, which may leave dangling processes and docker containers" + ); + term_now.store(true, Ordering::Relaxed); + + { + let mut state = state.lock().unwrap(); + let old_state = + mem::replace(&mut *state, ComplementRunnerState::Shutdown); + match old_state { + ComplementRunnerState::Startup => (), + ComplementRunnerState::Shutdown => unreachable!(), + ComplementRunnerState::Running(mut child) => { + // Killing the child process should terminate the + // complement runner thread in a + // bounded amount of time, because it will cause the + // stdout reader to return EOF. + child.kill().unwrap(); + } + } + } + + // TODO: kill dangling docker containers + eprintln!( + "WARNING: complement may have left dangling docker \ + containers. Cleanup for these is planned, but has not been \ + implemented yet. You need to identify and kill them manually" + ); + + true + } else { + // hit this branch if the signal handler is closed by the complement + // runner thread. This means the complement run finished + // without being canceled. + false + }; + + match thread_handle.join() { + Ok(result) => { + if canceled { + Err(miette!("complement run was canceled")) + } else { + result + } + } + Err(panic_value) => panic::resume_unwind(panic_value), + } + }); + + // From this point on, terminate immediately when signalled + term_now.store(true, Ordering::Relaxed); + + result +} + +/// Possible states for the complement runner thread. +/// +/// The current state should be protected by a mutex, where state changes are +/// only performed while the mutex is locked. This is to prevent a race +/// condition where the main thread handles a shutdown signal at the same time +/// that the complement runner thread is starting the child process, and so the +/// main thread fails to kill the child process. +/// +/// Valid state transitions: +/// +/// - `Startup` -> `Running` +/// - `Startup` -> `Shutdown` +/// - `Running` -> `Shutdown` +#[derive(Debug)] +enum ComplementRunnerState { + /// The complement child process has not been started yet + Startup, + /// The complement child process is running, and we have not yet received + /// a shutdown signal. + Running(Box), + /// We have received a shutdown signal. + Shutdown, +} + +/// Spawn complement chind process and handle it's output +/// +/// This is the "complement runner" thread, spawned by the [`run_complement`] +/// function. +fn run_complement_inner( + sh: &Shell, + out: &Path, + docker_image: &str, + test_count: u64, + state: &Mutex, +) -> Result { let cmd = cmd!(sh, "go tool test2json complement.test -test.v=test2json") .env("COMPLEMENT_BASE_IMAGE", docker_image) .env("COMPLEMENT_SPAWN_HS_TIMEOUT", "5") .env("COMPLEMENT_ALWAYS_PRINT_SERVER_LOGS", "1"); eprintln!("$ {cmd}"); - let child = Command::from(cmd) - .stdout(Stdio::piped()) - .spawn() - .into_diagnostic() - .wrap_err("error spawning complement process")?; - let stdout = child - .stdout - .expect("child process spawned with piped stdout should have stdout"); + + let stdout = { + let mut state = state.lock().unwrap(); + match &*state { + ComplementRunnerState::Startup => (), + ComplementRunnerState::Running(_) => unreachable!(), + ComplementRunnerState::Shutdown => { + return Err(miette!("complement run was canceled")) + } + } + let mut cmd = Command::from(cmd); + cmd.stdout(Stdio::piped()); + let mut child = StdCommandWrap::from(cmd) + .wrap(ProcessGroup::leader()) + .spawn() + .into_diagnostic() + .wrap_err("error spawning complement process")?; + let stdout = child.stdout().take().expect( + "child process spawned with piped stdout should have stdout", + ); + *state = ComplementRunnerState::Running(child); + stdout + }; let lines = BufReader::new(stdout).lines(); let mut ctx = TestContext::new(out, test_count)?;