From 1ca5663298722debd02b201de4047243b54bb577 Mon Sep 17 00:00:00 2001 From: Hare Date: Wed, 24 Jun 2026 00:23:10 +0900 Subject: [PATCH] fix: harden StopPod registry cleanup --- AGENTS.md | 1 + crates/client/src/ticket_role.rs | 13 +----- crates/pod-registry/src/table.rs | 37 ++++++++++++++++- crates/pod/src/spawn/registry.rs | 67 +++++++++++++++++++++++-------- crates/tui/src/dashboard/mod.rs | 2 +- crates/tui/src/dashboard/tests.rs | 5 ++- 6 files changed, 94 insertions(+), 31 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index dfe2e10a..789c1250 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -31,6 +31,7 @@ Podの状態から純粋に再現可能で、且つ揮発性の無い操作で 明示的に指示されない限り、読み取り以外の操作は控えること。 基本はworktree上の一時的なブランチでコミットを重ね、メインブランチに取り込む運用をしている。 +Orchestrator の cwd が orchestration 用ブランチ/worktree の場合、通常作業では親ブランチの dirty state を気にしない。 コミットメッセージは適当に`: *簡潔な1行*`で書いている。 外部の参考プロジェクトは必要に応じてローカルの外部 checkout からReadすること。 diff --git a/crates/client/src/ticket_role.rs b/crates/client/src/ticket_role.rs index 2a849009..b9265759 100644 --- a/crates/client/src/ticket_role.rs +++ b/crates/client/src/ticket_role.rs @@ -547,9 +547,6 @@ fn append_operation_targets(out: &mut String, context: &TicketRoleLaunchContext) if context.role != TicketRole::Orchestrator { return; } - if context.original_workspace_root.is_none() && context.target_workspace_root.is_none() { - return; - } out.push_str("\nOrchestrator operation targets:\n"); push_bounded_bullet( @@ -557,13 +554,6 @@ fn append_operation_targets(out: &mut String, context: &TicketRoleLaunchContext) "implementation_worktree_root", &context.implementation_worktree_root().display().to_string(), ); - if context.target_workspace_root.is_some() { - push_bounded_bullet( - out, - "merge_target_workspace_root", - &context.target_workspace_root().display().to_string(), - ); - } } fn default_pod_name(role: TicketRole, ticket: Option<&TicketRef>) -> String { @@ -706,6 +696,7 @@ mod tests { context_tokens: 0, }, status: PodStatus::Idle, + in_flight: protocol::InFlightSnapshot::default(), } } @@ -1152,7 +1143,7 @@ workflow = "ticket-review-workflow" assert!(text.contains("Orchestrator operation targets:")); assert!(text.contains("implementation_worktree_root")); - assert!(text.contains("merge_target_workspace_root")); + assert!(!text.contains("merge_target_workspace_root")); assert!(!text.contains("Workspace routing context:")); assert!(!text.contains("role_workspace_root")); assert!(!text.contains("role_cwd")); diff --git a/crates/pod-registry/src/table.rs b/crates/pod-registry/src/table.rs index 02f63111..b51c1ba7 100644 --- a/crates/pod-registry/src/table.rs +++ b/crates/pod-registry/src/table.rs @@ -4,12 +4,17 @@ use std::fs::{DirBuilder, File, OpenOptions}; use std::io::{self, Read, Seek, SeekFrom, Write}; use std::os::unix::fs::{DirBuilderExt, OpenOptionsExt}; use std::path::{Path, PathBuf}; +use std::thread; +use std::time::{Duration, Instant}; use fs4::fs_std::FileExt; use manifest::{ScopeRule, paths}; use serde::{Deserialize, Serialize}; use session_store::SegmentId; +const LOCK_WAIT_TIMEOUT: Duration = Duration::from_secs(10); +const LOCK_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(25); + /// On-disk representation of the allocation table. #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct LockFile { @@ -119,7 +124,37 @@ impl LockFileGuard { .truncate(false) .mode(0o600) .open(path)?; - FileExt::lock_exclusive(&file)?; + let started = Instant::now(); + loop { + match FileExt::try_lock_exclusive(&file) { + Ok(true) => break, + Ok(false) => { + if started.elapsed() >= LOCK_WAIT_TIMEOUT { + return Err(io::Error::new( + io::ErrorKind::TimedOut, + format!( + "timed out waiting for pod registry lock `{}`", + path.display() + ), + )); + } + thread::sleep(LOCK_WAIT_POLL_INTERVAL); + } + Err(error) if error.kind() == io::ErrorKind::WouldBlock => { + if started.elapsed() >= LOCK_WAIT_TIMEOUT { + return Err(io::Error::new( + io::ErrorKind::TimedOut, + format!( + "timed out waiting for pod registry lock `{}`", + path.display() + ), + )); + } + thread::sleep(LOCK_WAIT_POLL_INTERVAL); + } + Err(error) => return Err(error), + } + } let mut this = Self { file, data: LockFile::default(), diff --git a/crates/pod/src/spawn/registry.rs b/crates/pod/src/spawn/registry.rs index 307100ed..872a5698 100644 --- a/crates/pod/src/spawn/registry.rs +++ b/crates/pod/src/spawn/registry.rs @@ -35,10 +35,12 @@ type RegistryStateWriter = Arc io::Result<()> + S type RegistryReclaimWriter = Arc io::Result<()> + Send + Sync>; const RESTORE_REACHABILITY_TIMEOUT: Duration = Duration::from_millis(500); +const REGISTRY_CLEANUP_TIMEOUT: Duration = Duration::from_secs(15); pub struct SpawnedPodRegistry { records: Mutex>, cursors: Mutex>, + mutations: Mutex<()>, runtime_dir: Arc, state_writer: Option, reclaim_writer: Option, @@ -56,6 +58,7 @@ impl SpawnedPodRegistry { Arc::new(Self { records: Mutex::new(Vec::new()), cursors: Mutex::new(HashMap::new()), + mutations: Mutex::new(()), runtime_dir, state_writer: None, reclaim_writer: None, @@ -164,6 +167,7 @@ impl SpawnedPodRegistry { registry: Arc::new(Self { records: Mutex::new(records), cursors: Mutex::new(HashMap::new()), + mutations: Mutex::new(()), runtime_dir, state_writer: Some(state_writer), reclaim_writer: Some(reclaim_writer), @@ -178,9 +182,13 @@ impl SpawnedPodRegistry { /// error if either persisted write fails; the in-memory state is still /// updated in that case — the next successful write will reconcile. pub async fn add(&self, record: SpawnedPodRecord) -> io::Result<()> { - let mut records = self.records.lock().await; - records.push(record); - self.persist_records(records.as_slice()).await + let _mutation = self.mutations.lock().await; + let snapshot = { + let mut records = self.records.lock().await; + records.push(record); + records.clone() + }; + self.persist_records(&snapshot).await } /// Look up a record by pod name. Cloned so callers can drop the lock. @@ -201,29 +209,39 @@ impl SpawnedPodRegistry { /// reclaim any delegated Write scope owned by that child. Returns the /// removed record (if any). pub async fn remove(&self, pod_name: &str) -> io::Result> { - let removed = { + let _mutation = self.mutations.lock().await; + let (removed, snapshot) = { let mut records = self.records.lock().await; let idx = records.iter().position(|r| r.pod_name == pod_name); let removed = idx.map(|i| records.remove(i)); - self.persist_records(records.as_slice()).await?; - removed + let snapshot = records.clone(); + (removed, snapshot) }; + self.persist_records(&snapshot).await?; self.cursors.lock().await.remove(pod_name); if let Some(record) = &removed { - self.reclaim_record(record)?; - if let Some(write_reclaim) = &self.reclaim_writer { - write_reclaim(record)?; - } + self.reclaim_removed_record(record.clone()).await?; } Ok(removed) } - fn reclaim_record(&self, record: &SpawnedPodRecord) -> io::Result<()> { - let Some(parent_name) = &self.parent_name else { - release_child_allocation(&record.pod_name)?; - return Ok(()); - }; - reclaim_record(parent_name, self.parent_scope.as_ref(), record) + async fn reclaim_removed_record(&self, record: SpawnedPodRecord) -> io::Result<()> { + let parent_name = self.parent_name.clone(); + let parent_scope = self.parent_scope.clone(); + let reclaim_writer = self.reclaim_writer.clone(); + let pod_name = record.pod_name.clone(); + let reclaim = tokio::task::spawn_blocking(move || { + reclaim_removed_record_blocking(parent_name, parent_scope, reclaim_writer, record) + }); + tokio::time::timeout(REGISTRY_CLEANUP_TIMEOUT, reclaim) + .await + .map_err(|_| { + io::Error::new( + io::ErrorKind::TimedOut, + format!("timed out reclaiming spawned pod `{pod_name}`"), + ) + })? + .map_err(|err| io::Error::other(format!("spawned-pod reclaim task failed: {err}")))? } /// Read-only cursor lookup. Returns 0 when no cursor has been set. @@ -288,6 +306,23 @@ where }) } +fn reclaim_removed_record_blocking( + parent_name: Option, + parent_scope: Option, + reclaim_writer: Option, + record: SpawnedPodRecord, +) -> io::Result<()> { + if let Some(parent_name) = parent_name { + reclaim_record(&parent_name, parent_scope.as_ref(), &record)?; + } else { + release_child_allocation(&record.pod_name)?; + } + if let Some(write_reclaim) = reclaim_writer { + write_reclaim(&record)?; + } + Ok(()) +} + fn reclaim_record( parent_name: &str, parent_scope: Option<&SharedScope>, diff --git a/crates/tui/src/dashboard/mod.rs b/crates/tui/src/dashboard/mod.rs index c8e60ef0..1cb2afce 100644 --- a/crates/tui/src/dashboard/mod.rs +++ b/crates/tui/src/dashboard/mod.rs @@ -2890,7 +2890,7 @@ fn build_orchestrator_launch_context( pod_name: &str, ) -> TicketRoleLaunchContext { let mut context = TicketRoleLaunchContext::new( - orchestration_workspace_root.to_path_buf(), + original_workspace_root.to_path_buf(), TicketRole::Orchestrator, ) .with_cwd(orchestration_workspace_root.to_path_buf()) diff --git a/crates/tui/src/dashboard/tests.rs b/crates/tui/src/dashboard/tests.rs index 35065225..57111d42 100644 --- a/crates/tui/src/dashboard/tests.rs +++ b/crates/tui/src/dashboard/tests.rs @@ -16,14 +16,15 @@ fn orchestration_worktree_layout_is_stable_under_original_workspace_root() { } #[test] -fn orchestrator_launch_context_uses_orchestration_root_for_runtime_workspace() { +fn orchestrator_launch_context_uses_original_root_for_runtime_workspace_and_worktree_cwd() { let original = PathBuf::from("/repo/yoi"); let orchestration = original .join(".worktree") .join("orchestration") .join("yoi-orchestrator"); let context = build_orchestrator_launch_context(&original, &orchestration, "yoi-orchestrator"); - assert_eq!(context.workspace_root, orchestration); + assert_eq!(context.workspace_root, original); + assert_eq!(context.cwd.as_deref(), Some(orchestration.as_path())); assert_eq!( context.original_workspace_root.as_deref(), Some(original.as_path())