update: 親にターン完了を通達する経路の整理

This commit is contained in:
Keisuke Hirata 2026-05-15 04:38:53 +09:00
parent 6e5b1482e6
commit 8019d0d77c
No known key found for this signature in database
2 changed files with 271 additions and 19 deletions

View File

@ -99,6 +99,21 @@ enum PendingRun {
Resume,
}
impl PendingRun {
/// Whether this turn was kicked off by the parent (via `Method::Run`
/// or `Method::Resume`). Used by [`drive_turn`] to gate upward
/// `PodEvent::TurnEnded` / `PodEvent::Errored` reports so the parent
/// only sees completion signals for work it actually delegated.
/// `RunForNotification` covers self-initiated turns kicked from the
/// notify buffer (Notify / inbound PodEvent) and stays silent.
fn is_parent_originated(&self) -> bool {
match self {
PendingRun::Run(_) | PendingRun::InterruptAndRun(_) | PendingRun::Resume => true,
PendingRun::RunForNotification => false,
}
}
}
// ---------------------------------------------------------------------------
// PodController — actor that owns a Pod
// ---------------------------------------------------------------------------
@ -504,6 +519,7 @@ async fn controller_loop<C, St>(
// in one place, regardless of which Method caused it.
if let Some(run) = pending.take() {
set_controller_status(&shared_state, &runtime_dir, &event_tx, PodStatus::Running).await;
let parent_originated = run.is_parent_originated();
let (new_status, shutdown) = match run {
PendingRun::Run(input) => {
drive_turn(
@ -516,6 +532,7 @@ async fn controller_loop<C, St>(
self_parent_socket.as_ref(),
&spawner_name,
&spawned_registry,
parent_originated,
)
.await
}
@ -530,6 +547,7 @@ async fn controller_loop<C, St>(
self_parent_socket.as_ref(),
&spawner_name,
&spawned_registry,
parent_originated,
)
.await
}
@ -544,6 +562,7 @@ async fn controller_loop<C, St>(
self_parent_socket.as_ref(),
&spawner_name,
&spawned_registry,
parent_originated,
)
.await
}
@ -558,6 +577,7 @@ async fn controller_loop<C, St>(
self_parent_socket.as_ref(),
&spawner_name,
&spawned_registry,
parent_originated,
)
.await
}
@ -736,6 +756,12 @@ async fn controller_loop<C, St>(
/// `None` parent skips the send (top-level Pod). Transient method
/// rejections such as `AlreadyRunning` are intentionally NOT reported
/// as `Errored` — only the worker-execution `Err` branch below fires.
///
/// `parent_originated` further restricts both upward reports to turns
/// the parent actually delegated (`Method::Run` / `Method::Resume`).
/// `Method::Notify` / inbound `PodEvent` auto-kicks complete silently
/// so the parent's history does not get flooded with child-internal
/// turn boundaries.
#[allow(clippy::too_many_arguments)]
async fn drive_turn<F>(
pod_future: F,
@ -747,6 +773,7 @@ async fn drive_turn<F>(
parent_socket: Option<&PathBuf>,
self_name: &str,
spawned_registry: &Arc<SpawnedPodRegistry>,
parent_originated: bool,
) -> (PodStatus, bool)
where
F: std::future::Future<Output = Result<PodRunResult, PodError>>,
@ -766,7 +793,7 @@ where
PodRunResult::LimitReached => (PodStatus::Idle, RunResult::LimitReached),
};
let _ = event_tx.send(Event::RunEnd { result: run_result });
if matches!(run_result, RunResult::Finished) {
if parent_originated && matches!(run_result, RunResult::Finished) {
crate::ipc::event::fire_and_forget(
parent_socket.cloned(),
protocol::PodEvent::TurnEnded {
@ -792,6 +819,7 @@ where
code,
message: message.clone(),
});
if parent_originated {
crate::ipc::event::fire_and_forget(
parent_socket.cloned(),
protocol::PodEvent::Errored {
@ -799,6 +827,7 @@ where
message,
},
);
}
(PodStatus::Idle, shutdown_requested)
}
};
@ -919,3 +948,222 @@ fn worker_error_code(e: &PodError) -> ErrorCode {
_ => ErrorCode::Internal,
}
}
#[cfg(test)]
mod tests {
use super::*;
use protocol::PodEvent;
use protocol::stream::JsonLineReader;
use std::time::Duration;
use tempfile::TempDir;
use tokio::net::UnixListener;
#[test]
fn pending_run_parent_origin_table() {
assert!(PendingRun::Run(Vec::new()).is_parent_originated());
assert!(PendingRun::InterruptAndRun(Vec::new()).is_parent_originated());
assert!(PendingRun::Resume.is_parent_originated());
assert!(!PendingRun::RunForNotification.is_parent_originated());
}
struct DriveTurnEnv {
// Held to keep the channel alive; without this `method_rx.recv()`
// would observe channel-closed and confuse the select! arm.
_method_tx: mpsc::Sender<Method>,
method_rx: mpsc::Receiver<Method>,
event_tx: broadcast::Sender<Event>,
cancel_tx: mpsc::Sender<()>,
_cancel_rx: mpsc::Receiver<()>,
shared_state: Arc<PodSharedState>,
notify_buffer: NotifyBuffer,
spawned_registry: Arc<SpawnedPodRegistry>,
parent_socket_path: PathBuf,
_runtime_dir: Arc<RuntimeDir>,
_temp: TempDir,
}
async fn make_env() -> DriveTurnEnv {
let temp = tempfile::tempdir().expect("tempdir");
let runtime_dir = Arc::new(
RuntimeDir::create(temp.path(), "child-pod")
.await
.expect("runtime dir create"),
);
let (method_tx, method_rx) = mpsc::channel::<Method>(16);
let (event_tx, _) = broadcast::channel::<Event>(16);
let (cancel_tx, cancel_rx) = mpsc::channel::<()>(1);
let shared_state = Arc::new(PodSharedState::new(
"child-pod".to_string(),
session_store::new_session_id(),
String::new(),
protocol::Greeting {
pod_name: "child-pod".to_string(),
cwd: String::new(),
provider: String::new(),
model: String::new(),
scope_summary: String::new(),
tools: Vec::new(),
},
));
let notify_buffer = NotifyBuffer::new();
let spawned_registry = SpawnedPodRegistry::new(runtime_dir.clone());
let parent_socket_path = temp.path().join("parent.sock");
DriveTurnEnv {
_method_tx: method_tx,
method_rx,
event_tx,
cancel_tx,
_cancel_rx: cancel_rx,
shared_state,
notify_buffer,
spawned_registry,
parent_socket_path,
_runtime_dir: runtime_dir,
_temp: temp,
}
}
/// Listen on a bound UnixListener for one inbound connection and
/// return the first `Method::PodEvent` read from it. Returns `None`
/// on timeout / EOF / non-PodEvent.
async fn recv_pod_event(listener: UnixListener, timeout: Duration) -> Option<PodEvent> {
let accept = async {
let (stream, _) = listener.accept().await.ok()?;
let mut reader = JsonLineReader::new(stream);
match reader.next::<Method>().await {
Ok(Some(Method::PodEvent(e))) => Some(e),
_ => None,
}
};
tokio::time::timeout(timeout, accept).await.ok().flatten()
}
#[tokio::test]
async fn parent_originated_finished_fires_turn_ended() {
let mut env = make_env().await;
let listener = UnixListener::bind(&env.parent_socket_path).expect("bind listener");
let recv = tokio::spawn(recv_pod_event(listener, Duration::from_secs(2)));
let pod_future = async { Ok::<_, PodError>(PodRunResult::Finished) };
let (status, shutdown) = drive_turn(
pod_future,
&mut env.method_rx,
&env.event_tx,
&env.cancel_tx,
&env.shared_state,
&env.notify_buffer,
Some(&env.parent_socket_path),
"child-pod",
&env.spawned_registry,
true,
)
.await;
assert_eq!(status, PodStatus::Idle);
assert!(!shutdown);
let event = recv.await.expect("recv task").expect("PodEvent received");
match event {
PodEvent::TurnEnded { pod_name } => assert_eq!(pod_name, "child-pod"),
other => panic!("expected TurnEnded, got {other:?}"),
}
}
#[tokio::test]
async fn non_parent_originated_finished_stays_silent() {
let mut env = make_env().await;
let listener = UnixListener::bind(&env.parent_socket_path).expect("bind listener");
let pod_future = async { Ok::<_, PodError>(PodRunResult::Finished) };
let (status, _) = drive_turn(
pod_future,
&mut env.method_rx,
&env.event_tx,
&env.cancel_tx,
&env.shared_state,
&env.notify_buffer,
Some(&env.parent_socket_path),
"child-pod",
&env.spawned_registry,
false,
)
.await;
assert_eq!(status, PodStatus::Idle);
// Wait long enough for any (incorrect) fire-and-forget send to
// land; expect the accept to time out.
let accept = tokio::time::timeout(Duration::from_millis(200), listener.accept()).await;
assert!(
accept.is_err(),
"expected no PodEvent for non-parent-originated turn"
);
}
#[tokio::test]
async fn parent_originated_worker_error_fires_errored() {
let mut env = make_env().await;
let listener = UnixListener::bind(&env.parent_socket_path).expect("bind listener");
let recv = tokio::spawn(recv_pod_event(listener, Duration::from_secs(2)));
let pod_future = async {
Err::<PodRunResult, _>(PodError::Worker(WorkerError::Aborted(
"boom from test".into(),
)))
};
let (status, _) = drive_turn(
pod_future,
&mut env.method_rx,
&env.event_tx,
&env.cancel_tx,
&env.shared_state,
&env.notify_buffer,
Some(&env.parent_socket_path),
"child-pod",
&env.spawned_registry,
true,
)
.await;
assert_eq!(status, PodStatus::Idle);
let event = recv.await.expect("recv task").expect("PodEvent received");
match event {
PodEvent::Errored { pod_name, message } => {
assert_eq!(pod_name, "child-pod");
assert!(message.contains("boom from test"), "got message: {message}");
}
other => panic!("expected Errored, got {other:?}"),
}
}
#[tokio::test]
async fn non_parent_originated_worker_error_stays_silent() {
let mut env = make_env().await;
let listener = UnixListener::bind(&env.parent_socket_path).expect("bind listener");
let pod_future = async {
Err::<PodRunResult, _>(PodError::Worker(WorkerError::Aborted(
"boom from notify".into(),
)))
};
let (status, _) = drive_turn(
pod_future,
&mut env.method_rx,
&env.event_tx,
&env.cancel_tx,
&env.shared_state,
&env.notify_buffer,
Some(&env.parent_socket_path),
"child-pod",
&env.spawned_registry,
false,
)
.await;
assert_eq!(status, PodStatus::Idle);
let accept = tokio::time::timeout(Duration::from_millis(200), listener.accept()).await;
assert!(
accept.is_err(),
"expected no PodEvent for notification-originated worker error"
);
}
}

View File

@ -2,12 +2,15 @@
## 背景
子 Pod は `controller.rs``run_with_cancel_support` で turn 終了時に `PodEvent::TurnEnded` / `PodEvent::Errored``parent_socket` 宛に fire-and-forget している。現状この発火条件は「`pod_future` が `Finished` / 失敗で抜けたら」であり、その turn が何起点かを区別していない。
子 Pod は `controller.rs``drive_turn` で turn 終了時に `PodEvent::TurnEnded` / `PodEvent::Errored``parent_socket` 宛に fire-and-forget している。現状この発火条件は「`pod_future` が `Finished` / 失敗で抜けたら」であり、その turn が何起点かを区別していない。
`controller_loop` は次の turn を `PendingRun` enum (`Run` / `InterruptAndRun` / `RunForNotification` / `Resume`) として stage してから `drive_turn` を呼ぶ構造になっており、ここに turn の起源情報がすでに揃っている。
子 Pod のターンは複数の経路で開始しうる:
- 親からの `Method::Run``SpawnPod` の初回起動 / `SendToPod`
- 子内部での `Method::Notify` 起点の auto-kick`run_for_notification`。Notify の出どころは孫 Pod からの `PodEvent`、system reminder、外部 Notify など多岐にわたる
- 親からの `Method::Run``SpawnPod` の初回起動 / `SendToPod`。Pod が Paused 中なら `PendingRun::InterruptAndRun`、それ以外は `PendingRun::Run` に分岐するが、どちらも親由来。
- 子内部での `Method::Notify` または `Method::PodEvent` 起点の idle auto-kick`PendingRun::RunForNotification` → `pod.run_for_notification()`。Notify / PodEvent の出どころは孫 Pod の `PodEvent`、system reminder、外部 Notify など多岐にわたる。
- 親からの `Method::Resume``PendingRun::Resume`
- 将来的に増える可能性のある自走経路
入れ子 Pod を本格的に使い始めると、子の Notify 起点 turn が頻発する。これらが完了するたびに親へ `TurnEnded` が飛ぶと、親の `NotifyBuffer` には「自分が投げていないターンの完了」が積まれ、`pending_history_appends` で history に system message として commit され、親まで auto-kick される。親の history は「自分が把握していない子の内部活動」のイズで埋まり、auto-kick の連鎖も意味的に正当化しづらい。
@ -16,11 +19,12 @@
## 要件
- 子 Pod が parent へ送る `PodEvent::TurnEnded` は、親由来の `Method::Run` を起点とする turn が `Finished` で完了した場合に限る。
- 「親由来」の判定は「`Method::Run` で開始された turn」とする。SpawnPod 初回起動 / SendToPod はどちらも `Method::Run` 経由なので両方対象になる。
- `run_for_notification` 起点の turn は完了しても `TurnEnded` を上げない。
- `Method::Resume` 起点の turn は親由来として扱う(親が再開を指示した turn のため)。
- `PodEvent::Errored` も同じスコープに揃える。Notify 起点 turn の worker 失敗は `parent_socket` への報告対象外とする(親が知らない指示の失敗報告になるため)。`Event::Error` でローカルに通知される経路は維持。
- 子 Pod が parent へ送る `PodEvent::TurnEnded` は、親由来 turn が `Finished` で完了した場合に限る。
- 「親由来」の判定は `PendingRun` のバリアントベースとする。`PendingRun::Run` / `PendingRun::InterruptAndRun` / `PendingRun::Resume` の3つを親由来として扱う。
- `Run` / `InterruptAndRun`: いずれも `Method::Run` 経由SpawnPod 初回起動 / SendToPod。Paused 中なら `InterruptAndRun` に分岐するが、起点は親の `Method::Run`)。
- `Resume`: 親の `Method::Resume` 指示で再開した turn のため親由来扱い。
- `PendingRun::RunForNotification` 起点の turn は完了しても `TurnEnded` を上げない。`Method::Notify` 経由でも `Method::PodEvent` 経由でも同様。
- `PodEvent::Errored` も同じスコープに揃える。`RunForNotification` 起点 turn の worker 失敗は `parent_socket` への報告対象外とする(親が知らない指示の失敗報告になるため)。`Event::Error` でローカルに通知される経路は維持。
- `PodEvent::ShutDown` は turn とは独立した Pod プロセス終了通知なので、本チケットの対象外(従来通り常に発火)。
- `ScopeSubDelegated` も turn とは独立しているので対象外。
- 親側の受信処理(`apply_event_side_effects` / `NotifyBuffer` への投入 / history への appendは変更しない。発火源を絞ることで自然にイズが減る前提。
@ -28,9 +32,9 @@
## 完了条件
- 子 Pod を spawn して `SpawnPod` の初回 Run または `SendToPod` で投げた turn が `Finished` で完了すると、親の history に当該 `TurnEnded` 由来の system message が 1 件 append される。
- 子 Pod が孫 Pod からの `PodEvent::TurnEnded`(または他の Notifyで auto-kick された turn が完了しても、親の history には何も append されない。
- 親由来 turn が worker エラーで失敗すると親に `Errored` が届く。Notify 起点 turn の worker エラーは親に届かない。
- ユニットテストで「`Method::Run` 完了 → 親に届く」「`run_for_notification` 完了 → 親に届かない」「`Method::Resume` 完了 → 親に届く」「Notify 起点 turn の Errored → 親に届かない」を最低限カバーする。
- 子 Pod が孫 Pod からの `PodEvent`(または他の Notifyで auto-kick された turn が完了しても、親の history には何も append されない。
- 親由来 turn が worker エラーで失敗すると親に `Errored` が届く。`RunForNotification` 起点 turn の worker エラーは親に届かない。
- ユニットテストで「`PendingRun::Run` 完了 → 親に届く」「`PendingRun::RunForNotification` 完了 → 親に届かない」「`PendingRun::Resume` 完了 → 親に届く」「`RunForNotification` 起点 turn の Errored → 親に届かない」を最低限カバーする。
## 範囲外