//! Pod runtime command をサブプロセスとして立ち上げ、`YOI-READY` を待つ //! ハンドシェイク。 //! //! - 親プロセス (TUI / GUI / E2E) は profile/default/typed restore flags を //! 指定してこの関数に渡す。pod はそれを受けて socket を bind し、stderr に //! `YOI-READY\t\t` を吐く。 //! - 待機中の stderr 行は `progress` コールバック越しに呼び出し側へ流す。 //! UI の進捗表示や E2E のログ収集はここで賄う。 //! - `kill_on_drop = false` + `process_group(0)` により、親プロセス //! ライフサイクルから切り離した detached pod を作る。ready 後の lifecycle //! 管理は runtime ディレクトリ / socket を介して行う。 use std::io; use std::path::{Path, PathBuf}; use std::process::Stdio; use std::time::Duration; use crate::PodRuntimeCommand; use tokio::process::Command; use uuid::Uuid; const READY_PREFIX: &str = "YOI-READY\t"; const READY_TIMEOUT: Duration = Duration::from_secs(20); #[derive(Debug, Clone, PartialEq, Eq)] pub struct SpawnConfig { pub runtime_command: PodRuntimeCommand, /// `pod.name` として使う識別子。runtime ディレクトリ /// (`manifest::paths::pod_runtime_dir`) の解決と、ready 行に乗る /// 名前との突き合わせに使う。 pub pod_name: String, /// Optional profile selector. When present the child is launched with /// `--profile`; the Pod name is supplied through `--profile-pod-name` so /// profile evaluation stays separate from `--pod` restore semantics. pub profile: Option, /// pod の current_dir。 pub cwd: PathBuf, /// `Some(id)` のとき `--session ` を付与し、当該セッションから /// resume させる。 pub resume_from: Option, /// true のとき `--pod ` を付与し、pod 側で name-keyed state /// があれば resume、なければ同名の新規 Pod として起動させる。 pub resume_by_pod_name: bool, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct SpawnReady { pub pod_name: String, pub socket_path: PathBuf, } #[derive(Debug)] pub enum SpawnError { Io(io::Error), /// runtime ディレクトリが解決できなかった (環境変数未設定等)。 RuntimeDirUnavailable, PodLaunchFailed { command: PodRuntimeCommand, source: io::Error, }, PodExitedEarly { stderr_tail: String, }, Timeout, } impl std::fmt::Display for SpawnError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Io(e) => write!(f, "io error: {e}"), Self::RuntimeDirUnavailable => write!( f, "could not resolve runtime directory (set YOI_HOME, YOI_RUNTIME_DIR, XDG_RUNTIME_DIR, or HOME)" ), Self::PodLaunchFailed { command, source } => write!( f, "failed to launch pod runtime command `{command}`: {source}" ), Self::PodExitedEarly { stderr_tail } => { if stderr_tail.is_empty() { write!(f, "pod exited before becoming ready") } else { write!(f, "pod exited before becoming ready: {stderr_tail}") } } Self::Timeout => write!( f, "pod did not become ready within {}s", READY_TIMEOUT.as_secs() ), } } } impl std::error::Error for SpawnError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { Self::Io(error) | Self::PodLaunchFailed { source: error, .. } => Some(error), Self::RuntimeDirUnavailable | Self::PodExitedEarly { .. } | Self::Timeout => None, } } } impl From for SpawnError { fn from(e: io::Error) -> Self { Self::Io(e) } } /// pod を spawn し、`YOI-READY` ハンドシェイクが終わるまで待つ。 /// /// `progress` は ready 行を見つけるまでに観測した stderr の各行で呼ばれる /// (ready 行自体は除外される)。UI の表示更新や E2E ログ取得に使う。 pub async fn spawn_pod(config: SpawnConfig, mut progress: F) -> Result where F: FnMut(&str), { let pod_runtime_dir = manifest::paths::pod_runtime_dir(&config.pod_name) .ok_or(SpawnError::RuntimeDirUnavailable)?; std::fs::create_dir_all(&pod_runtime_dir).map_err(SpawnError::Io)?; let stderr_path = pod_runtime_dir.join("stderr.log"); let stderr_file = std::fs::File::create(&stderr_path).map_err(SpawnError::Io)?; let mut command = Command::new(config.runtime_command.program()); command .args(config.runtime_command.prefix_args()) .current_dir(&config.cwd) .stdin(Stdio::null()) .stdout(Stdio::null()) .stderr(Stdio::from(stderr_file)) .process_group(0); if let Some(profile) = &config.profile { command .arg("--profile") .arg(profile) .arg("--profile-pod-name") .arg(&config.pod_name); } if config.resume_by_pod_name && config.profile.is_none() { command.arg("--pod").arg(&config.pod_name); } if let Some(id) = config.resume_from { command .arg("--session") .arg(id.to_string()) .arg("--session-pod-name") .arg(&config.pod_name); } let mut child = command .spawn() .map_err(|source| SpawnError::PodLaunchFailed { command: config.runtime_command.clone(), source, })?; // Default `kill_on_drop = false` plus `process_group(0)` makes this // a detached Pod once startup succeeds: dropping the handle does not // terminate it, and terminal-generated signals for the parent's // process group do not hit the Pod. Runtime state/socket files are // the source of truth after that point. let ready = match wait_for_ready_file(&mut progress, &stderr_path, &mut child).await { Ok(ready) => ready, Err(e) => { let _ = child.start_kill(); let _ = child.wait().await; return Err(e); } }; tokio::spawn(async move { let _ = child.wait().await; }); Ok(ready) } async fn wait_for_ready_file( progress: &mut F, stderr_path: &Path, child: &mut tokio::process::Child, ) -> Result where F: FnMut(&str), { let mut tail = StderrTail::new(); let deadline = tokio::time::Instant::now() + READY_TIMEOUT; let mut offset = 0usize; loop { let content = match tokio::fs::read_to_string(stderr_path).await { Ok(content) => content, Err(e) if e.kind() == io::ErrorKind::NotFound => String::new(), Err(e) => return Err(SpawnError::Io(e)), }; if content.len() > offset { for line in content[offset..].lines() { if let Some(rest) = line.strip_prefix(READY_PREFIX) { let mut parts = rest.splitn(2, '\t'); let pod_name = parts.next().unwrap_or("").to_string(); let socket_str = parts.next().unwrap_or("").to_string(); if pod_name.is_empty() || socket_str.is_empty() { return Err(SpawnError::PodExitedEarly { stderr_tail: format!("malformed ready line: {line}"), }); } let socket_path = PathBuf::from(socket_str); wait_for_socket( &socket_path, deadline, child, stderr_path, &mut tail, &mut offset, ) .await?; return Ok(SpawnReady { pod_name, socket_path, }); } tail.push(line); progress(line); } offset = content.len(); } if tokio::time::Instant::now() >= deadline { return Err(SpawnError::Timeout); } tokio::select! { status = child.wait() => { let _ = status; // Pod は exit 直前に最終 stderr 行を flush することがある。 // child.wait() が解決した後に再読みして、原因行を取りこ // ぼさず PodExitedEarly に載せる。 drain_stderr_into_tail(stderr_path, &mut tail, &mut offset).await; return Err(SpawnError::PodExitedEarly { stderr_tail: tail.into_string(), }); } _ = tokio::time::sleep(Duration::from_millis(100)) => {} } } } async fn wait_for_socket( socket_path: &Path, deadline: tokio::time::Instant, child: &mut tokio::process::Child, stderr_path: &Path, tail: &mut StderrTail, offset: &mut usize, ) -> Result<(), SpawnError> { loop { match tokio::net::UnixStream::connect(socket_path).await { Ok(_) => return Ok(()), Err(e) if e.kind() == io::ErrorKind::NotFound || e.kind() == io::ErrorKind::ConnectionRefused => {} Err(e) => return Err(SpawnError::Io(e)), } if tokio::time::Instant::now() >= deadline { return Err(SpawnError::Timeout); } tokio::select! { status = child.wait() => { let _ = status; drain_stderr_into_tail(stderr_path, tail, offset).await; return Err(SpawnError::PodExitedEarly { stderr_tail: tail.as_string(), }); } _ = tokio::time::sleep(Duration::from_millis(50)) => {} } } } async fn drain_stderr_into_tail(stderr_path: &Path, tail: &mut StderrTail, offset: &mut usize) { let Ok(content) = tokio::fs::read_to_string(stderr_path).await else { return; }; if content.len() <= *offset { return; } for line in content[*offset..].lines() { if !line.starts_with(READY_PREFIX) { tail.push(line); } } *offset = content.len(); } struct StderrTail { lines: std::collections::VecDeque, } impl StderrTail { fn new() -> Self { Self { lines: std::collections::VecDeque::with_capacity(8), } } fn push(&mut self, line: &str) { if self.lines.len() == 8 { self.lines.pop_front(); } self.lines.push_back(line.to_string()); } fn as_string(&self) -> String { self.lines.iter().cloned().collect::>().join(" | ") } fn into_string(self) -> String { self.lines.into_iter().collect::>().join(" | ") } }