From 8019d0d77cd2612764ecb6e3cda6e0dee38d22ec Mon Sep 17 00:00:00 2001 From: Hare Date: Fri, 15 May 2026 04:38:53 +0900 Subject: [PATCH] =?UTF-8?q?update:=20=E8=A6=AA=E3=81=AB=E3=82=BF=E3=83=BC?= =?UTF-8?q?=E3=83=B3=E5=AE=8C=E4=BA=86=E3=82=92=E9=80=9A=E9=81=94=E3=81=99?= =?UTF-8?q?=E3=82=8B=E7=B5=8C=E8=B7=AF=E3=81=AE=E6=95=B4=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/pod/src/controller.rs | 264 +++++++++++++++++++++++++++- tickets/pod-parent-turn-callback.md | 26 +-- 2 files changed, 271 insertions(+), 19 deletions(-) diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index ff748ea6..49f3b7a1 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -99,6 +99,21 @@ enum PendingRun { Resume, } +impl PendingRun { + /// Whether this turn was kicked off by the parent (via `Method::Run` + /// or `Method::Resume`). Used by [`drive_turn`] to gate upward + /// `PodEvent::TurnEnded` / `PodEvent::Errored` reports so the parent + /// only sees completion signals for work it actually delegated. + /// `RunForNotification` covers self-initiated turns kicked from the + /// notify buffer (Notify / inbound PodEvent) and stays silent. + fn is_parent_originated(&self) -> bool { + match self { + PendingRun::Run(_) | PendingRun::InterruptAndRun(_) | PendingRun::Resume => true, + PendingRun::RunForNotification => false, + } + } +} + // --------------------------------------------------------------------------- // PodController — actor that owns a Pod // --------------------------------------------------------------------------- @@ -504,6 +519,7 @@ async fn controller_loop( // in one place, regardless of which Method caused it. if let Some(run) = pending.take() { set_controller_status(&shared_state, &runtime_dir, &event_tx, PodStatus::Running).await; + let parent_originated = run.is_parent_originated(); let (new_status, shutdown) = match run { PendingRun::Run(input) => { drive_turn( @@ -516,6 +532,7 @@ async fn controller_loop( self_parent_socket.as_ref(), &spawner_name, &spawned_registry, + parent_originated, ) .await } @@ -530,6 +547,7 @@ async fn controller_loop( self_parent_socket.as_ref(), &spawner_name, &spawned_registry, + parent_originated, ) .await } @@ -544,6 +562,7 @@ async fn controller_loop( self_parent_socket.as_ref(), &spawner_name, &spawned_registry, + parent_originated, ) .await } @@ -558,6 +577,7 @@ async fn controller_loop( self_parent_socket.as_ref(), &spawner_name, &spawned_registry, + parent_originated, ) .await } @@ -736,6 +756,12 @@ async fn controller_loop( /// `None` parent skips the send (top-level Pod). Transient method /// rejections such as `AlreadyRunning` are intentionally NOT reported /// as `Errored` — only the worker-execution `Err` branch below fires. +/// +/// `parent_originated` further restricts both upward reports to turns +/// the parent actually delegated (`Method::Run` / `Method::Resume`). +/// `Method::Notify` / inbound `PodEvent` auto-kicks complete silently +/// so the parent's history does not get flooded with child-internal +/// turn boundaries. #[allow(clippy::too_many_arguments)] async fn drive_turn( pod_future: F, @@ -747,6 +773,7 @@ async fn drive_turn( parent_socket: Option<&PathBuf>, self_name: &str, spawned_registry: &Arc, + parent_originated: bool, ) -> (PodStatus, bool) where F: std::future::Future>, @@ -766,7 +793,7 @@ where PodRunResult::LimitReached => (PodStatus::Idle, RunResult::LimitReached), }; let _ = event_tx.send(Event::RunEnd { result: run_result }); - if matches!(run_result, RunResult::Finished) { + if parent_originated && matches!(run_result, RunResult::Finished) { crate::ipc::event::fire_and_forget( parent_socket.cloned(), protocol::PodEvent::TurnEnded { @@ -792,13 +819,15 @@ where code, message: message.clone(), }); - crate::ipc::event::fire_and_forget( - parent_socket.cloned(), - protocol::PodEvent::Errored { - pod_name: self_name.to_string(), - message, - }, - ); + if parent_originated { + crate::ipc::event::fire_and_forget( + parent_socket.cloned(), + protocol::PodEvent::Errored { + pod_name: self_name.to_string(), + message, + }, + ); + } (PodStatus::Idle, shutdown_requested) } }; @@ -919,3 +948,222 @@ fn worker_error_code(e: &PodError) -> ErrorCode { _ => ErrorCode::Internal, } } + +#[cfg(test)] +mod tests { + use super::*; + use protocol::PodEvent; + use protocol::stream::JsonLineReader; + use std::time::Duration; + use tempfile::TempDir; + use tokio::net::UnixListener; + + #[test] + fn pending_run_parent_origin_table() { + assert!(PendingRun::Run(Vec::new()).is_parent_originated()); + assert!(PendingRun::InterruptAndRun(Vec::new()).is_parent_originated()); + assert!(PendingRun::Resume.is_parent_originated()); + assert!(!PendingRun::RunForNotification.is_parent_originated()); + } + + struct DriveTurnEnv { + // Held to keep the channel alive; without this `method_rx.recv()` + // would observe channel-closed and confuse the select! arm. + _method_tx: mpsc::Sender, + method_rx: mpsc::Receiver, + event_tx: broadcast::Sender, + cancel_tx: mpsc::Sender<()>, + _cancel_rx: mpsc::Receiver<()>, + shared_state: Arc, + notify_buffer: NotifyBuffer, + spawned_registry: Arc, + parent_socket_path: PathBuf, + _runtime_dir: Arc, + _temp: TempDir, + } + + async fn make_env() -> DriveTurnEnv { + let temp = tempfile::tempdir().expect("tempdir"); + let runtime_dir = Arc::new( + RuntimeDir::create(temp.path(), "child-pod") + .await + .expect("runtime dir create"), + ); + let (method_tx, method_rx) = mpsc::channel::(16); + let (event_tx, _) = broadcast::channel::(16); + let (cancel_tx, cancel_rx) = mpsc::channel::<()>(1); + let shared_state = Arc::new(PodSharedState::new( + "child-pod".to_string(), + session_store::new_session_id(), + String::new(), + protocol::Greeting { + pod_name: "child-pod".to_string(), + cwd: String::new(), + provider: String::new(), + model: String::new(), + scope_summary: String::new(), + tools: Vec::new(), + }, + )); + let notify_buffer = NotifyBuffer::new(); + let spawned_registry = SpawnedPodRegistry::new(runtime_dir.clone()); + let parent_socket_path = temp.path().join("parent.sock"); + + DriveTurnEnv { + _method_tx: method_tx, + method_rx, + event_tx, + cancel_tx, + _cancel_rx: cancel_rx, + shared_state, + notify_buffer, + spawned_registry, + parent_socket_path, + _runtime_dir: runtime_dir, + _temp: temp, + } + } + + /// Listen on a bound UnixListener for one inbound connection and + /// return the first `Method::PodEvent` read from it. Returns `None` + /// on timeout / EOF / non-PodEvent. + async fn recv_pod_event(listener: UnixListener, timeout: Duration) -> Option { + let accept = async { + let (stream, _) = listener.accept().await.ok()?; + let mut reader = JsonLineReader::new(stream); + match reader.next::().await { + Ok(Some(Method::PodEvent(e))) => Some(e), + _ => None, + } + }; + tokio::time::timeout(timeout, accept).await.ok().flatten() + } + + #[tokio::test] + async fn parent_originated_finished_fires_turn_ended() { + let mut env = make_env().await; + let listener = UnixListener::bind(&env.parent_socket_path).expect("bind listener"); + let recv = tokio::spawn(recv_pod_event(listener, Duration::from_secs(2))); + + let pod_future = async { Ok::<_, PodError>(PodRunResult::Finished) }; + let (status, shutdown) = drive_turn( + pod_future, + &mut env.method_rx, + &env.event_tx, + &env.cancel_tx, + &env.shared_state, + &env.notify_buffer, + Some(&env.parent_socket_path), + "child-pod", + &env.spawned_registry, + true, + ) + .await; + assert_eq!(status, PodStatus::Idle); + assert!(!shutdown); + + let event = recv.await.expect("recv task").expect("PodEvent received"); + match event { + PodEvent::TurnEnded { pod_name } => assert_eq!(pod_name, "child-pod"), + other => panic!("expected TurnEnded, got {other:?}"), + } + } + + #[tokio::test] + async fn non_parent_originated_finished_stays_silent() { + let mut env = make_env().await; + let listener = UnixListener::bind(&env.parent_socket_path).expect("bind listener"); + + let pod_future = async { Ok::<_, PodError>(PodRunResult::Finished) }; + let (status, _) = drive_turn( + pod_future, + &mut env.method_rx, + &env.event_tx, + &env.cancel_tx, + &env.shared_state, + &env.notify_buffer, + Some(&env.parent_socket_path), + "child-pod", + &env.spawned_registry, + false, + ) + .await; + assert_eq!(status, PodStatus::Idle); + + // Wait long enough for any (incorrect) fire-and-forget send to + // land; expect the accept to time out. + let accept = tokio::time::timeout(Duration::from_millis(200), listener.accept()).await; + assert!( + accept.is_err(), + "expected no PodEvent for non-parent-originated turn" + ); + } + + #[tokio::test] + async fn parent_originated_worker_error_fires_errored() { + let mut env = make_env().await; + let listener = UnixListener::bind(&env.parent_socket_path).expect("bind listener"); + let recv = tokio::spawn(recv_pod_event(listener, Duration::from_secs(2))); + + let pod_future = async { + Err::(PodError::Worker(WorkerError::Aborted( + "boom from test".into(), + ))) + }; + let (status, _) = drive_turn( + pod_future, + &mut env.method_rx, + &env.event_tx, + &env.cancel_tx, + &env.shared_state, + &env.notify_buffer, + Some(&env.parent_socket_path), + "child-pod", + &env.spawned_registry, + true, + ) + .await; + assert_eq!(status, PodStatus::Idle); + + let event = recv.await.expect("recv task").expect("PodEvent received"); + match event { + PodEvent::Errored { pod_name, message } => { + assert_eq!(pod_name, "child-pod"); + assert!(message.contains("boom from test"), "got message: {message}"); + } + other => panic!("expected Errored, got {other:?}"), + } + } + + #[tokio::test] + async fn non_parent_originated_worker_error_stays_silent() { + let mut env = make_env().await; + let listener = UnixListener::bind(&env.parent_socket_path).expect("bind listener"); + + let pod_future = async { + Err::(PodError::Worker(WorkerError::Aborted( + "boom from notify".into(), + ))) + }; + let (status, _) = drive_turn( + pod_future, + &mut env.method_rx, + &env.event_tx, + &env.cancel_tx, + &env.shared_state, + &env.notify_buffer, + Some(&env.parent_socket_path), + "child-pod", + &env.spawned_registry, + false, + ) + .await; + assert_eq!(status, PodStatus::Idle); + + let accept = tokio::time::timeout(Duration::from_millis(200), listener.accept()).await; + assert!( + accept.is_err(), + "expected no PodEvent for notification-originated worker error" + ); + } +} diff --git a/tickets/pod-parent-turn-callback.md b/tickets/pod-parent-turn-callback.md index 7c639b90..1e6ede36 100644 --- a/tickets/pod-parent-turn-callback.md +++ b/tickets/pod-parent-turn-callback.md @@ -2,12 +2,15 @@ ## 背景 -子 Pod は `controller.rs` の `run_with_cancel_support` で turn 終了時に `PodEvent::TurnEnded` / `PodEvent::Errored` を `parent_socket` 宛に fire-and-forget している。現状この発火条件は「`pod_future` が `Finished` / 失敗で抜けたら」であり、その turn が何起点かを区別していない。 +子 Pod は `controller.rs` の `drive_turn` で turn 終了時に `PodEvent::TurnEnded` / `PodEvent::Errored` を `parent_socket` 宛に fire-and-forget している。現状この発火条件は「`pod_future` が `Finished` / 失敗で抜けたら」であり、その turn が何起点かを区別していない。 + +`controller_loop` は次の turn を `PendingRun` enum (`Run` / `InterruptAndRun` / `RunForNotification` / `Resume`) として stage してから `drive_turn` を呼ぶ構造になっており、ここに turn の起源情報がすでに揃っている。 子 Pod のターンは複数の経路で開始しうる: -- 親からの `Method::Run`(`SpawnPod` の初回起動 / `SendToPod`) -- 子内部での `Method::Notify` 起点の auto-kick(`run_for_notification`)。Notify の出どころは孫 Pod からの `PodEvent`、system reminder、外部 Notify など多岐にわたる +- 親からの `Method::Run`(`SpawnPod` の初回起動 / `SendToPod`)。Pod が Paused 中なら `PendingRun::InterruptAndRun`、それ以外は `PendingRun::Run` に分岐するが、どちらも親由来。 +- 子内部での `Method::Notify` または `Method::PodEvent` 起点の idle auto-kick(`PendingRun::RunForNotification` → `pod.run_for_notification()`)。Notify / PodEvent の出どころは孫 Pod の `PodEvent`、system reminder、外部 Notify など多岐にわたる。 +- 親からの `Method::Resume`(`PendingRun::Resume`) - 将来的に増える可能性のある自走経路 入れ子 Pod を本格的に使い始めると、子の Notify 起点 turn が頻発する。これらが完了するたびに親へ `TurnEnded` が飛ぶと、親の `NotifyBuffer` には「自分が投げていないターンの完了」が積まれ、`pending_history_appends` で history に system message として commit され、親まで auto-kick される。親の history は「自分が把握していない子の内部活動」のノイズで埋まり、auto-kick の連鎖も意味的に正当化しづらい。 @@ -16,11 +19,12 @@ ## 要件 -- 子 Pod が parent へ送る `PodEvent::TurnEnded` は、親由来の `Method::Run` を起点とする turn が `Finished` で完了した場合に限る。 - - 「親由来」の判定は「`Method::Run` で開始された turn」とする。SpawnPod 初回起動 / SendToPod はどちらも `Method::Run` 経由なので両方対象になる。 - - `run_for_notification` 起点の turn は完了しても `TurnEnded` を上げない。 - - `Method::Resume` 起点の turn は親由来として扱う(親が再開を指示した turn のため)。 -- `PodEvent::Errored` も同じスコープに揃える。Notify 起点 turn の worker 失敗は `parent_socket` への報告対象外とする(親が知らない指示の失敗報告になるため)。`Event::Error` でローカルに通知される経路は維持。 +- 子 Pod が parent へ送る `PodEvent::TurnEnded` は、親由来 turn が `Finished` で完了した場合に限る。 + - 「親由来」の判定は `PendingRun` のバリアントベースとする。`PendingRun::Run` / `PendingRun::InterruptAndRun` / `PendingRun::Resume` の3つを親由来として扱う。 + - `Run` / `InterruptAndRun`: いずれも `Method::Run` 経由(SpawnPod 初回起動 / SendToPod。Paused 中なら `InterruptAndRun` に分岐するが、起点は親の `Method::Run`)。 + - `Resume`: 親の `Method::Resume` 指示で再開した turn のため親由来扱い。 + - `PendingRun::RunForNotification` 起点の turn は完了しても `TurnEnded` を上げない。`Method::Notify` 経由でも `Method::PodEvent` 経由でも同様。 +- `PodEvent::Errored` も同じスコープに揃える。`RunForNotification` 起点 turn の worker 失敗は `parent_socket` への報告対象外とする(親が知らない指示の失敗報告になるため)。`Event::Error` でローカルに通知される経路は維持。 - `PodEvent::ShutDown` は turn とは独立した Pod プロセス終了通知なので、本チケットの対象外(従来通り常に発火)。 - `ScopeSubDelegated` も turn とは独立しているので対象外。 - 親側の受信処理(`apply_event_side_effects` / `NotifyBuffer` への投入 / history への append)は変更しない。発火源を絞ることで自然にノイズが減る前提。 @@ -28,9 +32,9 @@ ## 完了条件 - 子 Pod を spawn して `SpawnPod` の初回 Run または `SendToPod` で投げた turn が `Finished` で完了すると、親の history に当該 `TurnEnded` 由来の system message が 1 件 append される。 -- 子 Pod が孫 Pod からの `PodEvent::TurnEnded`(または他の Notify)で auto-kick された turn が完了しても、親の history には何も append されない。 -- 親由来 turn が worker エラーで失敗すると親に `Errored` が届く。Notify 起点 turn の worker エラーは親に届かない。 -- ユニットテストで「`Method::Run` 完了 → 親に届く」「`run_for_notification` 完了 → 親に届かない」「`Method::Resume` 完了 → 親に届く」「Notify 起点 turn の Errored → 親に届かない」を最低限カバーする。 +- 子 Pod が孫 Pod からの `PodEvent`(または他の Notify)で auto-kick された turn が完了しても、親の history には何も append されない。 +- 親由来 turn が worker エラーで失敗すると親に `Errored` が届く。`RunForNotification` 起点 turn の worker エラーは親に届かない。 +- ユニットテストで「`PendingRun::Run` 完了 → 親に届く」「`PendingRun::RunForNotification` 完了 → 親に届かない」「`PendingRun::Resume` 完了 → 親に届く」「`RunForNotification` 起点 turn の Errored → 親に届かない」を最低限カバーする。 ## 範囲外