From 8e50a9583ac3b03213a08f4b571246da910dfebf Mon Sep 17 00:00:00 2001 From: Hare Date: Wed, 13 May 2026 06:07:38 +0900 Subject: [PATCH] =?UTF-8?q?refactor:=20PodController=E3=81=AE=E6=A7=8B?= =?UTF-8?q?=E9=80=A0=E3=81=AE=E3=83=AA=E3=83=95=E3=82=A1=E3=82=AF=E3=82=BF?= =?UTF-8?q?=E3=83=AA=E3=83=B3=E3=82=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TODO.md | 1 - crates/pod/src/controller.rs | 1051 ++++++++++++++-------------- tickets/pod-event-loop-dispatch.md | 65 -- 3 files changed, 525 insertions(+), 592 deletions(-) delete mode 100644 tickets/pod-event-loop-dispatch.md diff --git a/TODO.md b/TODO.md index 9c143bb5..1dab3486 100644 --- a/TODO.md +++ b/TODO.md @@ -9,7 +9,6 @@ - Pod: 任意ターンからの Fork(複数ターン巻き戻しを汎用化) → [tickets/pod-session-fork.md](tickets/pod-session-fork.md) - Pod: 子→親の TurnEnded/Errored callback を親由来ターンのみに絞る → [tickets/pod-parent-turn-callback.md](tickets/pod-parent-turn-callback.md) - Pod: セッションログをバックエンドにした Pod 単位の永続化 → [tickets/pod-persistent-state.md](tickets/pod-persistent-state.md) -- Pod: イベントハンドラからターン起動を分離(描画ループ式 dispatch) → [tickets/pod-event-loop-dispatch.md](tickets/pod-event-loop-dispatch.md) - 永続化層のセマンティック整理 → [tickets/persistence-semantics.md](tickets/persistence-semantics.md) - Exchange / Turn / Call セマンティクス整理 → [tickets/exchange-turn-call-semantics.md](tickets/exchange-turn-call-semantics.md) - llm-worker のエラー耐性 diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 7e754988..07e39061 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -1,4 +1,4 @@ -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; use llm_worker::WorkerError; @@ -19,7 +19,7 @@ use crate::spawn::comm_tools::{ use crate::spawn::registry::SpawnedPodRegistry; use crate::spawn::tool::spawn_pod_tool; use protocol::{ - AlertLevel, AlertSource, ErrorCode, Event, Method, PodStatus, RunResult, TurnResult, + AlertLevel, AlertSource, ErrorCode, Event, Method, PodStatus, RunResult, Segment, TurnResult, }; fn is_system_message_item(item: &Item) -> bool { @@ -94,6 +94,17 @@ async fn finish_controller_run( pod.spawn_post_run_memory_jobs(); } +/// Pending turn launch staged by an event handler for the next outer-loop +/// iteration. Each variant carries the input needed by the corresponding +/// `Pod::*` entry point — `RunForNotification` carries none because +/// `pod.run_for_notification()` drains the NotifyBuffer on its own. +enum PendingRun { + Run(Vec), + InterruptAndRun(Vec), + RunForNotification, + Resume, +} + // --------------------------------------------------------------------------- // PodController — actor that owns a Pod // --------------------------------------------------------------------------- @@ -111,35 +122,23 @@ impl PodController { C: LlmClient + Clone + 'static, St: Store + Clone + 'static, { + // === 1. Initialization (channels / RuntimeDir / pod-immutable + // snapshots / SpawnedPodRegistry / alerter attach / + // bash-output scope) === let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); - let (method_tx, mut method_rx) = mpsc::channel::(32); + let (method_tx, method_rx) = mpsc::channel::(32); let (event_tx, _) = broadcast::channel::(256); let alerter = Alerter::new(event_tx.clone()); // Runtime directory is created before tool registration because // the spawn-tool factories need its socket path, and before the - // initial status/history writes because those writes consume the - // greeting we build after registration is complete. + // initial status/history writes consume the greeting we build + // after registration is complete. let runtime_dir = Arc::new(RuntimeDir::create(runtime_base, &pod.manifest().pod.name).await?); - // Snapshot pod-immutable values needed for tool factories so the - // mutable worker borrow below doesn't conflict with reads on `pod`. - let scope_handle = pod.scope().clone(); - let pwd_for_tools = pod.pwd().to_path_buf(); let spawner_name = pod.manifest().pod.name.clone(); - let spawner_model = pod.manifest().model.clone(); - let memory_config = pod.manifest().memory.clone(); - - // Parent callback socket (this Pod's own parent, used for - // `PodEvent` upward reports). `None` for top-level Pods. let self_parent_socket = pod.callback_socket().cloned(); - - // `SpawnedPodRegistry` is shared between the Pod-orchestration - // tools (registered below) and the main loop's `PodEvent` - // handler (added later in this function), so hoist its creation - // above the worker-borrow block. - let spawner_socket = runtime_dir.socket_path(); let spawned_registry = SpawnedPodRegistry::new(runtime_dir.clone()); // Hand the alerter to the Pod so internal operations (compaction, @@ -153,11 +152,7 @@ impl PodController { // Bash spills long outputs to a per-pod subdir under the runtime // dir. Push a recursive `allow(Read)` for that path into the // Pod's runtime scope so the agent can `Read` saved files - // without polluting the workspace. The Pod's SharedScope is the - // single source of truth — every ScopedFs (builtin tools, - // fs_view, compact worker) reads from it, and any future scope - // mutation (SpawnPod-style revoke, future GrantScope) - // propagates through it. + // without polluting the workspace. let bash_output_dir = runtime_dir.path().join("bash-output"); std::fs::create_dir_all(&bash_output_dir).map_err(|e| { std::io::Error::other(format!( @@ -172,205 +167,24 @@ impl PodController { }]) .map_err(std::io::Error::other)?; - // Stashed during tool registration below so we can attach a - // `PodFsView` to the shared state once the latter exists. - let fs_for_view: tools::ScopedFs; - let task_store = pod.task_store(); - let session_id_for_usage = pod.session_id().to_string(); + // === 2. Worker event bridge wiring === + wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter); - let scope_change_sink = pod.scope_change_sink(); - - // Register event bridge callbacks on the worker - { - let worker = pod.worker_mut(); - - let tx = event_tx.clone(); - worker.on_turn_start(move |turn| { - let _ = tx.send(Event::TurnStart { turn }); - }); - - let tx = event_tx.clone(); - worker.on_turn_end(move |turn| { - let _ = tx.send(Event::TurnEnd { - turn, - result: TurnResult::Finished, - }); - }); - - let tx = event_tx.clone(); - worker.on_text_block(move |block| { - let tx_d = tx.clone(); - block.on_delta(move |text| { - let _ = tx_d.send(Event::TextDelta { - text: text.to_owned(), - }); - }); - let tx_s = tx.clone(); - block.on_stop(move |text| { - let _ = tx_s.send(Event::TextDone { - text: text.to_owned(), - }); - }); - }); - - let tx = event_tx.clone(); - worker.on_thinking_block(move |block| { - // Start fires unconditionally so the TUI can show - // "Thinking..." even when the provider doesn't emit - // plaintext deltas. - let _ = tx.send(Event::ThinkingStart); - let tx_d = tx.clone(); - block.on_delta(move |text| { - let _ = tx_d.send(Event::ThinkingDelta { - text: text.to_owned(), - }); - }); - let tx_s = tx.clone(); - block.on_stop(move |text| { - let _ = tx_s.send(Event::ThinkingDone { - text: text.to_owned(), - }); - }); - }); - - let tx = event_tx.clone(); - worker.on_tool_use_block(move |start, block| { - let _ = tx.send(Event::ToolCallStart { - id: start.id.clone(), - name: start.name.clone(), - }); - let id_for_delta = start.id.clone(); - let tx_d = tx.clone(); - block.on_delta(move |json| { - let _ = tx_d.send(Event::ToolCallArgsDelta { - id: id_for_delta.clone(), - json: json.to_owned(), - }); - }); - let tx_s = tx.clone(); - block.on_stop(move |call| { - let _ = tx_s.send(Event::ToolCallDone { - id: call.id.clone(), - name: call.name.clone(), - arguments: call.input.to_string(), - }); - }); - }); - - let tx = event_tx.clone(); - worker.on_tool_result(move |result| { - let _ = tx.send(Event::ToolResult { - id: result.tool_use_id.clone(), - summary: result.summary.clone(), - output: result.content.clone(), - is_error: result.is_error, - }); - }); - - let tx = event_tx.clone(); - worker.on_usage(move |event| { - let _ = tx.send(Event::Usage { - input_tokens: event.input_tokens, - output_tokens: event.output_tokens, - cache_read_input_tokens: event.cache_read_input_tokens, - }); - }); - - let tx = event_tx.clone(); - worker.on_error(move |event| { - let _ = tx.send(Event::Error { - code: ErrorCode::ProviderError, - message: event.message.clone(), - }); - }); - - let alerter_for_worker = alerter.clone(); - worker.on_warning(move |message| { - alerter_for_worker.alert(AlertLevel::Warn, AlertSource::Worker, message.to_owned()); - }); - - let tx = event_tx.clone(); - worker.on_history_append(move |item| { - if is_system_message_item(item) { - let value = serde_json::to_value(item).expect("Item is Serialize"); - let _ = tx.send(Event::SystemMessage { item: value }); - } - }); - - // Register the builtin file-manipulation tools (Read / Write / - // Edit / Glob / Grep / Bash). `ScopedFs` carries the pod- - // lifetime scope/pwd; `Tracker` is session-scoped — a fresh - // instance per controller spawn ensures state from a previous - // process lifetime cannot be reused after a resume. The tracker - // is also handed to the Pod itself so Pod-level operations (e.g. - // context compaction) can ask which files the agent has been - // touching. - // - // The Pod's SharedScope (already augmented with the - // bash-output Read rule above) is the single source of - // truth — every ScopedFs (builtin tools, fs_view, compact - // worker) reads from it, and any future scope mutation - // (SpawnPod-style revoke, future GrantScope) propagates - // through it. - let fs = - tools::ScopedFs::with_shared_scope(scope_handle.clone(), pwd_for_tools.clone()); - let tracker = tools::Tracker::new(); - // The same ScopedFs also powers the IPC `ListCompletions` - // query — keep a clone for the FS view we attach below, - // since the tools consume `fs` itself. - fs_for_view = fs.clone(); - worker.register_tools(tools::builtin_tools( - fs, - tracker.clone(), - task_store.clone(), - bash_output_dir, - )); - - // Memory subsystem opt-in. When `[memory]` is present in - // the manifest, register the memory-specific Read/Write/Edit - // tools that target `/memory/` and - // `/knowledge/` with their built-in linter. The - // companion deny rules on the generic CRUD scope were - // already applied during `Pod::from_manifest`. - if let Some(mem) = memory_config.as_ref() { - let layout = memory::WorkspaceLayout::resolve(mem, &pwd_for_tools); - let query_cfg = memory::tool::QueryConfig::from(mem); - worker.register_tool(memory::tool::read_tool_with_usage( - layout.clone(), - session_id_for_usage.clone(), - )); - worker.register_tool(memory::tool::write_tool(layout.clone())); - worker.register_tool(memory::tool::edit_tool(layout.clone())); - worker.register_tool(memory::tool::memory_query_tool(layout.clone(), query_cfg)); - worker.register_tool(memory::tool::knowledge_query_tool(layout, query_cfg)); - } - - // Pod-orchestration tools (SpawnPod + the four comm tools) - // share the Pod-scoped `SpawnedPodRegistry` hoisted above - // (also consumed by the main loop's `PodEvent` handler). - worker.register_tool(spawn_pod_tool( - spawner_name.clone(), - spawner_socket.clone(), - runtime_base.to_path_buf(), - pwd_for_tools, - spawned_registry.clone(), - self_parent_socket.clone(), - spawner_model.clone(), - scope_handle.clone(), - scope_change_sink.clone(), - )); - worker.register_tool(send_to_pod_tool(spawned_registry.clone())); - worker.register_tool(read_pod_output_tool(spawned_registry.clone())); - worker.register_tool(stop_pod_tool(spawned_registry.clone())); - worker.register_tool(list_pods_tool(spawned_registry.clone())); - pod.attach_tracker(tracker); - } + // === 3. Tool registration (builtin / memory / spawn-orchestration) === + let fs_for_view = register_pod_tools( + &mut pod, + bash_output_dir, + runtime_dir.socket_path(), + runtime_base.to_path_buf(), + spawned_registry.clone(), + ); // Materialise pending tool factories so the greeting reflects // the actual registered set instead of a hand-maintained mirror. pod.worker().tool_server_handle().flush_pending(); - // Greeting + initial runtime files now that the tool list is final. + // === 4. Initial runtime files + PodSharedState + PodHandle + + // SocketServer === let manifest_toml = toml::to_string_pretty(pod.manifest()).unwrap_or_default(); let greeting = build_greeting(&pod); let shared_state = Arc::new(PodSharedState::new( @@ -406,336 +220,521 @@ impl PodController { alerter: alerter.clone(), }; - // Start socket server (lives as a background task, cleaned up on - // drop via RuntimeDir). Kept alive by moving it into the - // controller task so it drops when that task ends. - let _socket_server = SocketServer::start(&handle).await?; + let socket_server = SocketServer::start(&handle).await?; + // === 5. controller_loop === // Clone cancel sender and notification buffer before moving pod - // into the controller task so the main loop can route - // `Method::Notify` into the buffer even while `pod` is held by - // an in-flight `run_for_notification` / `run` future. + // into the controller task so the in-flight turn can be reached + // via these handles while pod itself is borrowed by drive_turn. let cancel_tx = pod.worker_mut().cancel_sender(); let notify_buffer = pod.notify_buffer_handle(); - tokio::spawn(async move { - // Hold socket server alive for the lifetime of the controller task - let _socket_server = _socket_server; - - loop { - let method = match method_rx.recv().await { - Some(m) => m, - None => break, - }; - - match method { - Method::Run { input } => { - let status_before = shared_state.get_status(); - if status_before == PodStatus::Running { - let _ = event_tx.send(Event::Error { - code: ErrorCode::AlreadyRunning, - message: "Pod is already executing a turn".into(), - }); - continue; - } - let was_paused = status_before == PodStatus::Paused; - if let Err(e) = pod.validate_workflow_invocations(&input) { - let _ = event_tx.send(Event::Error { - code: ErrorCode::InvalidRequest, - message: e.to_string(), - }); - continue; - } - // Broadcast the accepted user message so every - // subscriber (including the submitter) can - // render the turn header + user line from a - // single source of truth. shared_state's - // `user_segments` is re-synced from `pod` after - // the run completes, so we don't push here. - let _ = event_tx.send(Event::UserMessage { - segments: input.clone(), - }); - set_controller_status( - &shared_state, - &runtime_dir, - &event_tx, - PodStatus::Running, - ) - .await; - - let run_future = async { - if was_paused { - pod.interrupt_and_run(input).await - } else { - pod.run(input).await - } - }; - let (new_status, shutdown) = run_with_cancel_support( - run_future, - &mut method_rx, - &event_tx, - &cancel_tx, - &shared_state, - ¬ify_buffer, - self_parent_socket.as_ref(), - &spawner_name, - &spawned_registry, - ) - .await; - - finish_controller_run( - &mut pod, - &shared_state, - &runtime_dir, - &event_tx, - new_status, - ) - .await; - - if shutdown { - let _ = event_tx.send(Event::Shutdown); - break; - } - } - - Method::Notify { message } => { - let _ = event_tx.send(Event::Notify { - message: message.clone(), - }); - pod.push_notify(message); - let status = shared_state.get_status(); - if status != PodStatus::Idle { - // RUNNING / Paused: the buffer push is the - // entire operation; an in-flight turn (or the - // next Resume/Run) will drain the buffer - // at its next pre_llm_request. - continue; - } - // IDLE: auto-start a turn so the LLM sees the - // buffered notification(s) without a human Run. - set_controller_status( - &shared_state, - &runtime_dir, - &event_tx, - PodStatus::Running, - ) - .await; - - let (new_status, shutdown) = run_with_cancel_support( - pod.run_for_notification(), - &mut method_rx, - &event_tx, - &cancel_tx, - &shared_state, - ¬ify_buffer, - self_parent_socket.as_ref(), - &spawner_name, - &spawned_registry, - ) - .await; - - finish_controller_run( - &mut pod, - &shared_state, - &runtime_dir, - &event_tx, - new_status, - ) - .await; - - if shutdown { - let _ = event_tx.send(Event::Shutdown); - break; - } - } - - Method::Resume => { - if shared_state.get_status() != PodStatus::Paused { - let _ = event_tx.send(Event::Error { - code: ErrorCode::NotPaused, - message: "Pod is not paused".into(), - }); - continue; - } - set_controller_status( - &shared_state, - &runtime_dir, - &event_tx, - PodStatus::Running, - ) - .await; - - let (new_status, shutdown) = run_with_cancel_support( - pod.resume(), - &mut method_rx, - &event_tx, - &cancel_tx, - &shared_state, - ¬ify_buffer, - self_parent_socket.as_ref(), - &spawner_name, - &spawned_registry, - ) - .await; - - finish_controller_run( - &mut pod, - &shared_state, - &runtime_dir, - &event_tx, - new_status, - ) - .await; - - if shutdown { - let _ = event_tx.send(Event::Shutdown); - break; - } - } - - Method::Cancel => { - let _ = event_tx.send(Event::Error { - code: ErrorCode::NotRunning, - message: "Pod is not running".into(), - }); - } - - Method::Pause => { - // Already paused → idempotent no-op. Otherwise the - // Pod is Idle (Running turns go through - // `run_with_cancel_support`, not this outer match), so - // there is nothing to pause. - let status = shared_state.get_status(); - if status != PodStatus::Paused { - let _ = event_tx.send(Event::Error { - code: ErrorCode::NotRunning, - message: "Pod is not running".into(), - }); - } - } - - Method::Shutdown => { - let _ = event_tx.send(Event::Shutdown); - break; - } - - // GetHistory / ListCompletions are handled at the socket - // layer (direct response). If they somehow reach the - // controller, ignore them. - Method::GetHistory | Method::ListCompletions { .. } => {} - - Method::PodEvent(event) => { - // Echo the received event to all subscribers so - // every client sees the input that drove any - // following auto-kicked turn. - let _ = event_tx.send(Event::PodEvent(event.clone())); - // (1) system side effects — idempotent and - // tolerant of out-of-order delivery (e.g. - // `TurnEnded` arriving after `ShutDown`). - crate::ipc::event::apply_event_side_effects( - &event, - &spawned_registry, - &spawner_name, - &self_parent_socket, - ) - .await; - // (2) render a one-line summary and push it - // into the notification buffer; the next LLM - // request will inject it as a system message - // via `PodInterceptor::pre_llm_request`. - let text = crate::ipc::event::render_event(&event); - pod.push_notify(text); - // Auto-kick a turn if the Pod is idle so the - // notification is not stranded. Matches the - // `Method::Notify` idle path. - if shared_state.get_status() == PodStatus::Idle { - set_controller_status( - &shared_state, - &runtime_dir, - &event_tx, - PodStatus::Running, - ) - .await; - - let (new_status, shutdown) = run_with_cancel_support( - pod.run_for_notification(), - &mut method_rx, - &event_tx, - &cancel_tx, - &shared_state, - ¬ify_buffer, - self_parent_socket.as_ref(), - &spawner_name, - &spawned_registry, - ) - .await; - - finish_controller_run( - &mut pod, - &shared_state, - &runtime_dir, - &event_tx, - new_status, - ) - .await; - - if shutdown { - let _ = event_tx.send(Event::Shutdown); - break; - } - } - } - } - } - - // Background memory jobs own extract/consolidate workers after a - // turn completes. Join them before the controller task exits so - // staging writes and consolidation cleanups are not abandoned. - pod.wait_for_memory_jobs().await; - - // Report upward that this Pod is stopping before the - // controller task exits. Awaited (not fire-and-forget): - // after `shutdown_tx.send` the process may exit quickly, - // and a spawned task would be killed mid-send. The - // `connect_and_send` helper enforces a 5 s timeout so a - // stuck parent cannot block process exit indefinitely. - if let Some(parent) = self_parent_socket.as_ref() { - if let Err(e) = crate::ipc::event::send_pod_event( - parent, - protocol::PodEvent::ShutDown { - pod_name: spawner_name.clone(), - }, - ) - .await - { - tracing::warn!(error = %e, "ShutDown PodEvent send failed"); - } - } - - let _ = shutdown_tx.send(()); - }); + tokio::spawn(controller_loop( + pod, + method_rx, + event_tx, + shared_state, + runtime_dir, + cancel_tx, + notify_buffer, + self_parent_socket, + spawner_name, + spawned_registry, + shutdown_tx, + socket_server, + )); Ok((handle, shutdown_rx)) } } -/// Runs a Pod future while concurrently processing incoming methods. -/// -/// Returns `(final_status, shutdown_requested)`. +/// Wire the per-event broadcast bridges on the Pod's Worker. Each callback +/// re-publishes a worker-level signal as a `protocol::Event` on `event_tx` +/// so subscribers (TUI, socket clients) get a single typed stream. +fn wire_event_bridges_on_worker( + pod: &mut Pod, + event_tx: &broadcast::Sender, + alerter: &Alerter, +) where + C: LlmClient + Clone + 'static, + St: Store + Clone + 'static, +{ + let worker = pod.worker_mut(); + + let tx = event_tx.clone(); + worker.on_turn_start(move |turn| { + let _ = tx.send(Event::TurnStart { turn }); + }); + + let tx = event_tx.clone(); + worker.on_turn_end(move |turn| { + let _ = tx.send(Event::TurnEnd { + turn, + result: TurnResult::Finished, + }); + }); + + let tx = event_tx.clone(); + worker.on_text_block(move |block| { + let tx_d = tx.clone(); + block.on_delta(move |text| { + let _ = tx_d.send(Event::TextDelta { + text: text.to_owned(), + }); + }); + let tx_s = tx.clone(); + block.on_stop(move |text| { + let _ = tx_s.send(Event::TextDone { + text: text.to_owned(), + }); + }); + }); + + let tx = event_tx.clone(); + worker.on_thinking_block(move |block| { + // Start fires unconditionally so the TUI can show "Thinking..." + // even when the provider doesn't emit plaintext deltas. + let _ = tx.send(Event::ThinkingStart); + let tx_d = tx.clone(); + block.on_delta(move |text| { + let _ = tx_d.send(Event::ThinkingDelta { + text: text.to_owned(), + }); + }); + let tx_s = tx.clone(); + block.on_stop(move |text| { + let _ = tx_s.send(Event::ThinkingDone { + text: text.to_owned(), + }); + }); + }); + + let tx = event_tx.clone(); + worker.on_tool_use_block(move |start, block| { + let _ = tx.send(Event::ToolCallStart { + id: start.id.clone(), + name: start.name.clone(), + }); + let id_for_delta = start.id.clone(); + let tx_d = tx.clone(); + block.on_delta(move |json| { + let _ = tx_d.send(Event::ToolCallArgsDelta { + id: id_for_delta.clone(), + json: json.to_owned(), + }); + }); + let tx_s = tx.clone(); + block.on_stop(move |call| { + let _ = tx_s.send(Event::ToolCallDone { + id: call.id.clone(), + name: call.name.clone(), + arguments: call.input.to_string(), + }); + }); + }); + + let tx = event_tx.clone(); + worker.on_tool_result(move |result| { + let _ = tx.send(Event::ToolResult { + id: result.tool_use_id.clone(), + summary: result.summary.clone(), + output: result.content.clone(), + is_error: result.is_error, + }); + }); + + let tx = event_tx.clone(); + worker.on_usage(move |event| { + let _ = tx.send(Event::Usage { + input_tokens: event.input_tokens, + output_tokens: event.output_tokens, + cache_read_input_tokens: event.cache_read_input_tokens, + }); + }); + + let tx = event_tx.clone(); + worker.on_error(move |event| { + let _ = tx.send(Event::Error { + code: ErrorCode::ProviderError, + message: event.message.clone(), + }); + }); + + let alerter_for_worker = alerter.clone(); + worker.on_warning(move |message| { + alerter_for_worker.alert(AlertLevel::Warn, AlertSource::Worker, message.to_owned()); + }); + + let tx = event_tx.clone(); + worker.on_history_append(move |item| { + if is_system_message_item(item) { + let value = serde_json::to_value(item).expect("Item is Serialize"); + let _ = tx.send(Event::SystemMessage { item: value }); + } + }); +} + +/// Register the builtin file-manipulation tools, optional memory tools, +/// and the Pod-orchestration tools (SpawnPod + comm) on the Pod's +/// Worker. Returns the `ScopedFs` clone used to attach a `PodFsView` to +/// the shared state. +fn register_pod_tools( + pod: &mut Pod, + bash_output_dir: PathBuf, + spawner_socket: PathBuf, + runtime_base: PathBuf, + spawned_registry: Arc, +) -> tools::ScopedFs +where + C: LlmClient + Clone + 'static, + St: Store + Clone + 'static, +{ + // Pod-immutable snapshots taken before the mutable worker borrow + // below so the worker borrow doesn't conflict with reads on `pod`. + let scope_handle = pod.scope().clone(); + let pwd = pod.pwd().to_path_buf(); + let task_store = pod.task_store(); + let session_id_for_usage = pod.session_id().to_string(); + let scope_change_sink = pod.scope_change_sink(); + let memory_config = pod.manifest().memory.clone(); + let spawner_name = pod.manifest().pod.name.clone(); + let spawner_model = pod.manifest().model.clone(); + let self_parent_socket = pod.callback_socket().cloned(); + + let worker = pod.worker_mut(); + + // The Pod's SharedScope (already augmented with the bash-output + // Read rule by the caller) is the single source of truth — every + // ScopedFs (builtin tools, fs_view, compact worker) reads from it, + // and any future scope mutation (SpawnPod-style revoke, future + // GrantScope) propagates through it. + let fs = tools::ScopedFs::with_shared_scope(scope_handle.clone(), pwd.clone()); + let tracker = tools::Tracker::new(); + // Same ScopedFs also powers the IPC `ListCompletions` query — keep + // a clone for the FS view we attach below, since the tools consume + // `fs` itself. + let fs_for_view = fs.clone(); + worker.register_tools(tools::builtin_tools( + fs, + tracker.clone(), + task_store, + bash_output_dir, + )); + + // Memory subsystem opt-in. When `[memory]` is present in the + // manifest, register the memory-specific Read/Write/Edit tools that + // target `/memory/` and `/knowledge/` with + // their built-in linter. Companion deny rules on the generic CRUD + // scope were already applied during `Pod::from_manifest`. + if let Some(mem) = memory_config.as_ref() { + let layout = memory::WorkspaceLayout::resolve(mem, &pwd); + let query_cfg = memory::tool::QueryConfig::from(mem); + worker.register_tool(memory::tool::read_tool_with_usage( + layout.clone(), + session_id_for_usage, + )); + worker.register_tool(memory::tool::write_tool(layout.clone())); + worker.register_tool(memory::tool::edit_tool(layout.clone())); + worker.register_tool(memory::tool::memory_query_tool(layout.clone(), query_cfg)); + worker.register_tool(memory::tool::knowledge_query_tool(layout, query_cfg)); + } + + // Pod-orchestration tools (SpawnPod + the four comm tools) share + // the Pod-scoped `SpawnedPodRegistry` (also consumed by the main + // loop's `PodEvent` handler). + worker.register_tool(spawn_pod_tool( + spawner_name, + spawner_socket, + runtime_base, + pwd, + spawned_registry.clone(), + self_parent_socket, + spawner_model, + scope_handle, + scope_change_sink, + )); + worker.register_tool(send_to_pod_tool(spawned_registry.clone())); + worker.register_tool(read_pod_output_tool(spawned_registry.clone())); + worker.register_tool(stop_pod_tool(spawned_registry.clone())); + worker.register_tool(list_pods_tool(spawned_registry)); + pod.attach_tracker(tracker); + fs_for_view +} + +/// Idle/Paused event loop. Each iteration either fires a staged +/// `PendingRun` (delegating to [`drive_turn`] for the Running phase) or +/// waits for the next `Method`. Method handlers stop at "update state + +/// stage `pending`"; the loop's top-of-iteration block owns the +/// status-flip → run → finish sequence so it lives in exactly one +/// place. +#[allow(clippy::too_many_arguments)] +async fn controller_loop( + mut pod: Pod, + mut method_rx: mpsc::Receiver, + event_tx: broadcast::Sender, + shared_state: Arc, + runtime_dir: Arc, + cancel_tx: mpsc::Sender<()>, + notify_buffer: NotifyBuffer, + self_parent_socket: Option, + spawner_name: String, + spawned_registry: Arc, + shutdown_tx: oneshot::Sender<()>, + socket_server: SocketServer, +) where + C: LlmClient + Clone + 'static, + St: Store + Clone + 'static, +{ + // Hold socket server alive for the lifetime of the controller task. + let _socket_server = socket_server; + + let mut pending: Option = None; + + loop { + // Top-of-iteration: if an event handler staged a run, fire it + // here so the status flip → drive_turn → finish sequence lives + // in one place, regardless of which Method caused it. + if let Some(run) = pending.take() { + set_controller_status(&shared_state, &runtime_dir, &event_tx, PodStatus::Running).await; + let (new_status, shutdown) = match run { + PendingRun::Run(input) => { + drive_turn( + pod.run(input), + &mut method_rx, + &event_tx, + &cancel_tx, + &shared_state, + ¬ify_buffer, + self_parent_socket.as_ref(), + &spawner_name, + &spawned_registry, + ) + .await + } + PendingRun::InterruptAndRun(input) => { + drive_turn( + pod.interrupt_and_run(input), + &mut method_rx, + &event_tx, + &cancel_tx, + &shared_state, + ¬ify_buffer, + self_parent_socket.as_ref(), + &spawner_name, + &spawned_registry, + ) + .await + } + PendingRun::RunForNotification => { + drive_turn( + pod.run_for_notification(), + &mut method_rx, + &event_tx, + &cancel_tx, + &shared_state, + ¬ify_buffer, + self_parent_socket.as_ref(), + &spawner_name, + &spawned_registry, + ) + .await + } + PendingRun::Resume => { + drive_turn( + pod.resume(), + &mut method_rx, + &event_tx, + &cancel_tx, + &shared_state, + ¬ify_buffer, + self_parent_socket.as_ref(), + &spawner_name, + &spawned_registry, + ) + .await + } + }; + finish_controller_run(&mut pod, &shared_state, &runtime_dir, &event_tx, new_status) + .await; + if shutdown { + let _ = event_tx.send(Event::Shutdown); + break; + } + continue; + } + + let method = match method_rx.recv().await { + Some(m) => m, + None => break, + }; + + match method { + Method::Run { input } => { + let status_before = shared_state.get_status(); + if status_before == PodStatus::Running { + // Defensive: the inner select! inside drive_turn + // already rejects `Run` while a turn is live, so + // this branch is only reachable across a race window + // around status flips. + let _ = event_tx.send(Event::Error { + code: ErrorCode::AlreadyRunning, + message: "Pod is already executing a turn".into(), + }); + continue; + } + if let Err(e) = pod.validate_workflow_invocations(&input) { + let _ = event_tx.send(Event::Error { + code: ErrorCode::InvalidRequest, + message: e.to_string(), + }); + continue; + } + // Broadcast the accepted user message so every + // subscriber (including the submitter) can render the + // turn header + user line from a single source of + // truth. shared_state's `user_segments` is re-synced + // from `pod` after the run completes, so we don't push + // here. + let _ = event_tx.send(Event::UserMessage { + segments: input.clone(), + }); + pending = Some(if status_before == PodStatus::Paused { + PendingRun::InterruptAndRun(input) + } else { + PendingRun::Run(input) + }); + } + + Method::Notify { message } => { + let _ = event_tx.send(Event::Notify { + message: message.clone(), + }); + pod.push_notify(message); + // RUNNING / Paused: the buffer push is the entire + // operation; an in-flight turn (or the next + // Resume/Run) will drain it at its next + // pre_llm_request. IDLE: auto-start a turn so the LLM + // sees the buffered notification(s) without a human + // Run. + if shared_state.get_status() == PodStatus::Idle { + pending = Some(PendingRun::RunForNotification); + } + } + + Method::Resume => { + if shared_state.get_status() != PodStatus::Paused { + let _ = event_tx.send(Event::Error { + code: ErrorCode::NotPaused, + message: "Pod is not paused".into(), + }); + continue; + } + pending = Some(PendingRun::Resume); + } + + Method::Cancel => { + let _ = event_tx.send(Event::Error { + code: ErrorCode::NotRunning, + message: "Pod is not running".into(), + }); + } + + Method::Pause => { + // Already paused → idempotent no-op. Otherwise the + // Pod is Idle (Running turns go through `drive_turn`, + // not this outer match), so there is nothing to pause. + if shared_state.get_status() != PodStatus::Paused { + let _ = event_tx.send(Event::Error { + code: ErrorCode::NotRunning, + message: "Pod is not running".into(), + }); + } + } + + Method::Shutdown => { + let _ = event_tx.send(Event::Shutdown); + break; + } + + // GetHistory / ListCompletions are handled at the socket + // layer (direct response). If they reach the controller, + // ignore them. + Method::GetHistory | Method::ListCompletions { .. } => {} + + Method::PodEvent(event) => { + // Echo the received event to all subscribers so every + // client sees the input that drove any following + // auto-kicked turn. + let _ = event_tx.send(Event::PodEvent(event.clone())); + // (1) system side effects — idempotent and tolerant of + // out-of-order delivery (e.g. `TurnEnded` arriving + // after `ShutDown`). + crate::ipc::event::apply_event_side_effects( + &event, + &spawned_registry, + &spawner_name, + &self_parent_socket, + ) + .await; + // (2) render a one-line summary and push it into the + // notification buffer; the next LLM request will + // inject it as a system message via + // `PodInterceptor::pre_llm_request`. + pod.push_notify(crate::ipc::event::render_event(&event)); + // Auto-kick a turn if the Pod is idle so the + // notification is not stranded. Matches the + // `Method::Notify` idle path. + if shared_state.get_status() == PodStatus::Idle { + pending = Some(PendingRun::RunForNotification); + } + } + } + } + + // Background memory jobs own extract/consolidate workers after a + // turn completes. Join them before the controller task exits so + // staging writes and consolidation cleanups are not abandoned. + pod.wait_for_memory_jobs().await; + + // Report upward that this Pod is stopping before the controller + // task exits. Awaited (not fire-and-forget): after `shutdown_tx.send` + // the process may exit quickly, and a spawned task would be killed + // mid-send. The `connect_and_send` helper enforces a 5 s timeout so + // a stuck parent cannot block process exit indefinitely. + if let Some(parent) = self_parent_socket.as_ref() { + if let Err(e) = crate::ipc::event::send_pod_event( + parent, + protocol::PodEvent::ShutDown { + pod_name: spawner_name.clone(), + }, + ) + .await + { + tracing::warn!(error = %e, "ShutDown PodEvent send failed"); + } + } + + let _ = shutdown_tx.send(()); +} + +/// Drives a Pod future (one in-flight turn) while concurrently +/// processing incoming methods through an inner select! arm. Returns +/// `(final_status, shutdown_requested)`. /// /// `parent_socket` / `self_name` drive upward `PodEvent` reports /// (`TurnEnded` on a clean Finished, `Errored` on a worker failure). /// `None` parent skips the send (top-level Pod). Transient method /// rejections such as `AlreadyRunning` are intentionally NOT reported /// as `Errored` — only the worker-execution `Err` branch below fires. -async fn run_with_cancel_support( +#[allow(clippy::too_many_arguments)] +async fn drive_turn( pod_future: F, method_rx: &mut mpsc::Receiver, event_tx: &broadcast::Sender, cancel_tx: &mpsc::Sender<()>, shared_state: &Arc, notify_buffer: &NotifyBuffer, - parent_socket: Option<&std::path::PathBuf>, + parent_socket: Option<&PathBuf>, self_name: &str, spawned_registry: &Arc, ) -> (PodStatus, bool) diff --git a/tickets/pod-event-loop-dispatch.md b/tickets/pod-event-loop-dispatch.md deleted file mode 100644 index 377974e5..00000000 --- a/tickets/pod-event-loop-dispatch.md +++ /dev/null @@ -1,65 +0,0 @@ -# Pod: イベントハンドラからターン起動を分離する - -## 背景 - -`crates/pod/src/controller.rs` の controller タスクは、 outer loop の Method ハンドラ内に **イベント処理とターン起動を同居** させている。 - -該当する箇所: - -- `Method::Run` → outer arm 内で `run_with_cancel_support(pod.run(...)).await` と `finish_controller_run.await` を呼ぶ -- `Method::Notify` (Idle) → outer arm 内で `set_controller_status(Running).await` のあと `run_with_cancel_support(pod.run_for_notification()).await` / `finish_controller_run.await` -- `Method::PodEvent` (Idle) → 同上 (副作用処理 `apply_event_side_effects.await` も同じ arm 内) -- `Method::Resume` → 同上 - -ハンドラ自身が「ターンを丸ごと await する長時間処理」 を抱えているため、 以下が起きる: - -- 同じターン起動コード (`set_controller_status` → `run_with_cancel_support` → `finish_controller_run`) が 4 箇所に重複する -- ターン起動の発火元 (Run / Notify / PodEvent / Resume) と起動本体が分離されていないため、 起動条件の変更が複数 arm に波及する -- ターン起動に必要な前処理 (history への user message append / NotifyBuffer への push / status 遷移) も各 arm に分散している - -ターン起動を outer loop の周回トップに集約し、 各イベントハンドラは「次のターンに渡す入力をその場で確定 (history append / NotifyBuffer push) + `needs_run` フラグセット」 までに留めることで、 起動コードの重複と入力経路の分散をなくす。 - -加えて、 `PodController::spawn` 自体が約 615 行に膨らんでおり、「組み立て (channel/runtime_dir/Worker hook 配線/ツール登録/PodHandle 構築)」と「実行ループ (controller タスク)」が単一関数に同居している。 outer loop を上記の形に書き直すタイミングで、 spawn の構造分解も同時に行う方が、 重複コードの統合先が見やすくなる。 - -なお、 別途観測されている「auto-kick されたターンの内部で controller がデッドロックする現象」 はこの整理では解決しない (inner select! のアーム内 await が pending する経路は構造上残る)。 根本原因の特定は別チケットの対象。 - -## 要件 - -- イベントハンドラは **「状態更新と `needs_run` フラグ立て」 まで** にとどめる。ハンドラ内で `run_with_cancel_support.await` / `finish_controller_run.await` を呼ばない -- outer loop の各周回はまず `needs_run` を評価し、 立っていればターン起動 (`run_with_cancel_support` → `finish_controller_run`) を実行してフラグを降ろす。 立っていなければ `method_rx.recv().await` で次の Method を待つ -- 既存の inner select! によるターン中の Method 並行受信は維持する。 ターン本体の借用構造 (`&mut Pod` を Worker が抱える) も変更しない -- `needs_run` を立てる契機は最低限以下を含める (= 現状 auto-kick している経路): - - `Method::Run` - - `Method::Resume` - - `Method::Notify` (Idle 時のみ) - - `Method::PodEvent` (Idle 時のみ) -- 「次のターンの起動意図と入力」 は `needs_run` を起動意図を持つ enum として表現する: - - `Run { input }` / `InterruptAndRun { input }` / `RunForNotification` / `Resume` - - `Method::Notify` / `Method::PodEvent` の Idle 経路は NotifyBuffer に push したうえで `RunForNotification` を立てる (現状の `pod.run_for_notification()` が NotifyBuffer から自動取得する挙動に乗るため、 enum に入力を載せる必要はない) -- `Worker::run` / `Pod::run` の入力受け取り API には触らない (interceptor の `on_prompt_submit` cancel / 書き換え / extras ordering の invariant に踏み込まない) -- Pause / Shutdown / Cancel はハンドラ内で完結する (フラグ化しない、 既存通り即時処理) -- Running 中に来た `Method::Notify` / `Method::PodEvent` の挙動 (NotifyBuffer に積むだけ、 副作用は実行) は変えない -- 上記改修に合わせて `PodController::spawn` を以下の責務単位に分解する: - - 初期化 (channel 群 / `RuntimeDir` / pod-immutable snapshot / `SpawnedPodRegistry` / `alerter` 装着 / bash-output scope) - - Worker への event bridge コールバック配線 (`on_turn_*` / `on_*_block` / `on_tool_result` / `on_usage` / `on_warning` / `on_history_append` 等) - - ツール登録 (builtin / memory / spawn orchestration) - - 初期ファイル書き出し + `PodSharedState` / `PodHandle` 構築 + `SocketServer` 起動 - - `controller_loop` — Idle/Paused 状態の event loop + 後処理 (現 `tokio::spawn(async move { loop { ... } })` の本体) - - `drive_turn` — Running 状態の event loop。 現 `run_with_cancel_support` を改名し、 `controller_loop` と同格の関数として並べる (「cancel support」 という実装詳細名から役割ベースの名前に改める) -- 分解後の `spawn` はこの順序で各責務を呼び出す薄いフローになる。 Idle/Paused 側 (`controller_loop`) と Running 側 (`drive_turn`) の event loop が同格の関数として並ぶ形に揃える - -## 完了条件 - -- `Method::Run` / `Method::Resume` / `Method::Notify(Idle)` / `Method::PodEvent(Idle)` のいずれも、 ターン起動が outer loop の周回トップ 1 箇所に集約されており、 ハンドラ側にはターン起動コードが残っていない -- 各イベントハンドラの async body 内に「ターン丸ごと」 や `finish_controller_run` 等の長時間 await が無い -- 既存挙動が変わらない (どの Method で auto-kick されるか、 ターン中の Cancel / Pause / Shutdown が効くか、 NotifyBuffer に積まれた内容が次ターンで反映されるか、 ターン起動順序) -- `PodController::spawn` が責務単位の関数列を順に呼ぶ薄いフローになっており、 単一関数の中に組み立てと実行ループが同居していない -- Idle/Paused 状態の event loop (`controller_loop`) と Running 状態の event loop (`drive_turn`) が同格の関数として並んでおり、 名前が役割を表している (`run_with_cancel_support` の名前は残らない) -- `crates/pod/tests/controller_test.rs` の既存テストが通る - -## 範囲外 - -- 観測されたデッドロックの根本原因特定 (別チケット予定) -- `Pod` 構造体の `&mut self` borrow を分解して outer loop を完全 dispatcher 化する大改修 (将来検討) -- Cancel / Shutdown の専用チャネル化 (controller が固まっている時には別経路でも効かないため、 メリット薄と判断して見送り) -- `Worker::run` / `Pod::run` の入力なし API 化 (controller の dispatch を bool に圧縮するメリットに対し、 worker interceptor invariant を壊すコストが見合わない)