From 1668e981b437dbf3a5345a0b0e9ba4613727e004 Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 19 Apr 2026 06:40:45 +0900 Subject: [PATCH] =?UTF-8?q?Pod=E6=93=8D=E4=BD=9C=E3=83=84=E3=83=BC?= =?UTF-8?q?=E3=83=AB=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/pod/src/pod_comm_tools.rs | 64 +++++++++++++++++++++++-- crates/pod/tests/pod_comm_tools_test.rs | 57 +++++++++++++++++++++- tickets/pod-comm-tools.md | 9 ++-- 3 files changed, 120 insertions(+), 10 deletions(-) diff --git a/crates/pod/src/pod_comm_tools.rs b/crates/pod/src/pod_comm_tools.rs index dd37684d..1400afa8 100644 --- a/crates/pod/src/pod_comm_tools.rs +++ b/crates/pod/src/pod_comm_tools.rs @@ -17,7 +17,7 @@ use async_trait::async_trait; use llm_worker::llm_client::types::{ContentPart, Item, Role}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use protocol::stream::{JsonLineReader, JsonLineWriter}; -use protocol::{Event, Method}; +use protocol::{ErrorCode, Event, Method}; use serde::Deserialize; use tokio::net::UnixStream; @@ -45,7 +45,8 @@ struct NameInput { const SEND_TO_POD_DESCRIPTION: &str = "Send a text message to a previously spawned Pod. The spawned Pod \ -processes it as a user turn. Does not wait for the Pod's response — \ +processes it as a user turn. Fails if the Pod is already executing a \ +turn — retry after it finishes. Does not wait for the turn to complete; \ use `ReadPodOutput` to fetch results afterwards."; #[derive(Debug, Deserialize, schemars::JsonSchema)] @@ -71,9 +72,17 @@ impl Tool for SendToPodTool { .await .ok_or_else(|| unknown_pod_err(&input.name))?; - connect_and_send(&record.socket_path, &Method::Run { input: input.message }) + send_run_and_confirm(&record.socket_path, input.message) .await - .map_err(|e| ToolError::ExecutionFailed(format!("send to `{}`: {e}", input.name)))?; + .map_err(|e| match e { + SendRunError::AlreadyRunning => ToolError::ExecutionFailed(format!( + "pod `{}` is already running a turn; wait for it to finish and retry", + input.name + )), + SendRunError::Io(msg) => { + ToolError::ExecutionFailed(format!("send to `{}`: {msg}", input.name)) + } + })?; Ok(ToolOutput { summary: format!("sent message to `{}`", input.name), @@ -328,6 +337,51 @@ async fn connect_and_send(socket: &Path, method: &Method) -> std::io::Result<()> Ok(()) } +/// Failure modes distinguished by `SendToPod`. +enum SendRunError { + /// Target Pod responded with `Error { AlreadyRunning }` — the + /// caller can retry once the current turn ends. + AlreadyRunning, + /// Any other failure (connect / write / read / unexpected EOF). + Io(String), +} + +/// Write `Method::Run` to the target and read back events until we see +/// either `TurnStart` (accepted) or `Error { AlreadyRunning }` +/// (rejected). Any replayed notifications that precede the response are +/// skipped. Times out per-read so a stuck Pod doesn't hang the tool. +async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRunError> { + let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket)) + .await + .map_err(|_| SendRunError::Io("connect timed out".into()))? + .map_err(|e| SendRunError::Io(format!("connect: {e}")))?; + let (r, w) = stream.into_split(); + let mut writer = JsonLineWriter::new(w); + let mut reader = JsonLineReader::new(r); + tokio::time::timeout(SOCKET_OP_TIMEOUT, writer.write(&Method::Run { input })) + .await + .map_err(|_| SendRunError::Io("write timed out".into()))? + .map_err(|e| SendRunError::Io(format!("write: {e}")))?; + loop { + let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::()) + .await + .map_err(|_| SendRunError::Io("read timed out".into()))? + .map_err(|e| SendRunError::Io(format!("read: {e}")))?; + match event { + Some(Event::Error { + code: ErrorCode::AlreadyRunning, + .. + }) => return Err(SendRunError::AlreadyRunning), + Some(Event::TurnStart { .. }) => return Ok(()), + // Notifications and other pre-turn events are replayed to + // new subscribers; keep reading until the controller's + // response to our `Run` shows up. + Some(_) => continue, + None => return Err(SendRunError::Io("connection closed before response".into())), + } + } +} + /// Connect and ask the Pod for its conversation history. Skips /// pre-History events (such as buffered notifications replayed to new /// clients). Returns the raw JSON items as `serde_json::Value` since @@ -384,7 +438,7 @@ fn extract_assistant_text(items: &[serde_json::Value]) -> String { for part in content { if let ContentPart::Text { text } = part { if !out.is_empty() { - out.push('\n'); + out.push_str("\n\n"); } out.push_str(&text); } diff --git a/crates/pod/tests/pod_comm_tools_test.rs b/crates/pod/tests/pod_comm_tools_test.rs index c18e7fd7..f570bd74 100644 --- a/crates/pod/tests/pod_comm_tools_test.rs +++ b/crates/pod/tests/pod_comm_tools_test.rs @@ -21,7 +21,7 @@ use pod::runtime_dir::{RuntimeDir, SpawnedPodRecord}; use pod::scope_lock::{self, LockFileGuard}; use pod::spawned_pod_registry::SpawnedPodRegistry; use protocol::stream::{JsonLineReader, JsonLineWriter}; -use protocol::{Event, Greeting, Method}; +use protocol::{ErrorCode, Event, Greeting, Method}; use serde_json::json; use tempfile::TempDir; use tokio::net::UnixListener; @@ -94,6 +94,26 @@ fn accept_one_method(listener: UnixListener) -> JoinHandle> { }) } +/// Accept one connection, read one `Method`, then write `response` +/// back. Used by `SendToPod` tests to mock the real controller's +/// `TurnStart` acknowledgement (or its `AlreadyRunning` rejection). +fn accept_method_and_respond( + listener: UnixListener, + response: Event, +) -> JoinHandle> { + tokio::spawn(async move { + let (stream, _) = listener.accept().await.ok()?; + let (r, w) = stream.into_split(); + let mut reader = JsonLineReader::new(r); + let mut writer = JsonLineWriter::new(w); + let method = reader.next::().await.ok().flatten(); + if method.is_some() { + let _ = writer.write(&response).await; + } + method + }) +} + /// Pretend to be a spawned Pod that responds to `GetHistory` with a /// fixed set of items. Accepts connections until the first one that /// delivers a `GetHistory` method; earlier probes (empty accepts) and @@ -152,7 +172,9 @@ fn assistant(text: &str) -> Item { async fn send_to_pod_delivers_run_method() { let (tmp, registry, _rd) = setup_registry().await; let (socket, listener) = bind_mock_socket(tmp.path(), "child").await; - let received = accept_one_method(listener); + // Mock the controller's accept path: after reading the method, + // ack with `TurnStart` so `SendToPod`'s confirmation loop succeeds. + let received = accept_method_and_respond(listener, Event::TurnStart { turn: 1 }); register_child(®istry, "child", &socket, tmp.path()).await; let def = send_to_pod_tool(registry); @@ -178,6 +200,37 @@ async fn send_to_pod_errors_on_unknown_pod() { assert!(err.to_string().contains("no spawned pod"), "{err}"); } +#[tokio::test] +async fn send_to_pod_errors_when_pod_already_running() { + let (tmp, registry, _rd) = setup_registry().await; + let (socket, listener) = bind_mock_socket(tmp.path(), "child").await; + // Respond with the same `Error { AlreadyRunning }` that the real + // controller emits when `Method::Run` arrives during RUNNING. + let received = accept_method_and_respond( + listener, + Event::Error { + code: ErrorCode::AlreadyRunning, + message: "Pod is already executing a turn".into(), + }, + ); + register_child(®istry, "child", &socket, tmp.path()).await; + + let def = send_to_pod_tool(registry); + let (_meta, tool) = def(); + let input = json!({ "name": "child", "message": "hi" }).to_string(); + let err = tool.execute(&input).await.unwrap_err(); + assert!( + err.to_string().contains("already running"), + "expected AlreadyRunning wording: {err}" + ); + + // Ensure the listener was in fact hit with a Method::Run before the + // rejection path fired — otherwise we'd be asserting on an error + // that came from a connect failure. + let method = received.await.unwrap().expect("expected a method"); + assert!(matches!(method, Method::Run { .. })); +} + // --------------------------------------------------------------------------- // ReadPodOutput // --------------------------------------------------------------------------- diff --git a/tickets/pod-comm-tools.md b/tickets/pod-comm-tools.md index 4c7ffc42..e9d6860b 100644 --- a/tickets/pod-comm-tools.md +++ b/tickets/pod-comm-tools.md @@ -38,7 +38,7 @@ 出力: - 前回読んだ位置以降の assistant テキスト出力 -- 現在の状態(`running` / `idle` / `stopped`) +- 現在の到達性(`alive` / `stopped`) 内部動作: - spawn 記録から socket path を引く @@ -53,11 +53,11 @@ - `name`: 対象の Pod 出力: -- 終了確認 +- 終了要求を送った旨 - 回収された scope の要約 内部動作: -- socket に接続 → `Method::Shutdown` 送信 → 終了確認受信 → 切断 +- socket に接続 → `Method::Shutdown` 送信(応答は待たない)→ 切断 - scope lock file を flock → 対象の allocation 削除 → spawner の deny を解除 → unlock - spawn 記録から対象を削除 @@ -94,3 +94,6 @@ - コールバック通知は `tickets/pod-callback.md` - Pod ネットワークの GUI / TUI 可視化 +- spawner プロセス再起動後の `spawned_pods.json` からの復旧(現状は write-through のみ) +- `ReadPodOutput` カーソルの永続化(インメモリのみ、再起動で 0 に戻る) +- Pod の詳細ステータス(`running` / `idle`)を `Event::History` に含める拡張