refactor: PodControllerの構造のリファクタリング
This commit is contained in:
parent
5a3e3f5994
commit
ba3655522b
1
TODO.md
1
TODO.md
|
|
@ -9,7 +9,6 @@
|
||||||
- Pod: 任意ターンからの Fork(複数ターン巻き戻しを汎用化) → [tickets/pod-session-fork.md](tickets/pod-session-fork.md)
|
- Pod: 任意ターンからの Fork(複数ターン巻き戻しを汎用化) → [tickets/pod-session-fork.md](tickets/pod-session-fork.md)
|
||||||
- Pod: 子→親の TurnEnded/Errored callback を親由来ターンのみに絞る → [tickets/pod-parent-turn-callback.md](tickets/pod-parent-turn-callback.md)
|
- Pod: 子→親の TurnEnded/Errored callback を親由来ターンのみに絞る → [tickets/pod-parent-turn-callback.md](tickets/pod-parent-turn-callback.md)
|
||||||
- Pod: セッションログをバックエンドにした Pod 単位の永続化 → [tickets/pod-persistent-state.md](tickets/pod-persistent-state.md)
|
- Pod: セッションログをバックエンドにした Pod 単位の永続化 → [tickets/pod-persistent-state.md](tickets/pod-persistent-state.md)
|
||||||
- Pod: イベントハンドラからターン起動を分離(描画ループ式 dispatch) → [tickets/pod-event-loop-dispatch.md](tickets/pod-event-loop-dispatch.md)
|
|
||||||
- 永続化層のセマンティック整理 → [tickets/persistence-semantics.md](tickets/persistence-semantics.md)
|
- 永続化層のセマンティック整理 → [tickets/persistence-semantics.md](tickets/persistence-semantics.md)
|
||||||
- Exchange / Turn / Call セマンティクス整理 → [tickets/exchange-turn-call-semantics.md](tickets/exchange-turn-call-semantics.md)
|
- Exchange / Turn / Call セマンティクス整理 → [tickets/exchange-turn-call-semantics.md](tickets/exchange-turn-call-semantics.md)
|
||||||
- llm-worker のエラー耐性
|
- llm-worker のエラー耐性
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
use std::path::Path;
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use llm_worker::WorkerError;
|
use llm_worker::WorkerError;
|
||||||
|
|
@ -19,7 +19,7 @@ use crate::spawn::comm_tools::{
|
||||||
use crate::spawn::registry::SpawnedPodRegistry;
|
use crate::spawn::registry::SpawnedPodRegistry;
|
||||||
use crate::spawn::tool::spawn_pod_tool;
|
use crate::spawn::tool::spawn_pod_tool;
|
||||||
use protocol::{
|
use protocol::{
|
||||||
AlertLevel, AlertSource, ErrorCode, Event, Method, PodStatus, RunResult, TurnResult,
|
AlertLevel, AlertSource, ErrorCode, Event, Method, PodStatus, RunResult, Segment, TurnResult,
|
||||||
};
|
};
|
||||||
|
|
||||||
fn is_system_message_item(item: &Item) -> bool {
|
fn is_system_message_item(item: &Item) -> bool {
|
||||||
|
|
@ -94,6 +94,17 @@ async fn finish_controller_run<C, St>(
|
||||||
pod.spawn_post_run_memory_jobs();
|
pod.spawn_post_run_memory_jobs();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Pending turn launch staged by an event handler for the next outer-loop
|
||||||
|
/// iteration. Each variant carries the input needed by the corresponding
|
||||||
|
/// `Pod::*` entry point — `RunForNotification` carries none because
|
||||||
|
/// `pod.run_for_notification()` drains the NotifyBuffer on its own.
|
||||||
|
enum PendingRun {
|
||||||
|
Run(Vec<Segment>),
|
||||||
|
InterruptAndRun(Vec<Segment>),
|
||||||
|
RunForNotification,
|
||||||
|
Resume,
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// PodController — actor that owns a Pod
|
// PodController — actor that owns a Pod
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
@ -111,35 +122,23 @@ impl PodController {
|
||||||
C: LlmClient + Clone + 'static,
|
C: LlmClient + Clone + 'static,
|
||||||
St: Store + Clone + 'static,
|
St: Store + Clone + 'static,
|
||||||
{
|
{
|
||||||
|
// === 1. Initialization (channels / RuntimeDir / pod-immutable
|
||||||
|
// snapshots / SpawnedPodRegistry / alerter attach /
|
||||||
|
// bash-output scope) ===
|
||||||
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
|
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
|
||||||
let (method_tx, mut method_rx) = mpsc::channel::<Method>(32);
|
let (method_tx, method_rx) = mpsc::channel::<Method>(32);
|
||||||
let (event_tx, _) = broadcast::channel::<Event>(256);
|
let (event_tx, _) = broadcast::channel::<Event>(256);
|
||||||
let alerter = Alerter::new(event_tx.clone());
|
let alerter = Alerter::new(event_tx.clone());
|
||||||
|
|
||||||
// Runtime directory is created before tool registration because
|
// Runtime directory is created before tool registration because
|
||||||
// the spawn-tool factories need its socket path, and before the
|
// the spawn-tool factories need its socket path, and before the
|
||||||
// initial status/history writes because those writes consume the
|
// initial status/history writes consume the greeting we build
|
||||||
// greeting we build after registration is complete.
|
// after registration is complete.
|
||||||
let runtime_dir =
|
let runtime_dir =
|
||||||
Arc::new(RuntimeDir::create(runtime_base, &pod.manifest().pod.name).await?);
|
Arc::new(RuntimeDir::create(runtime_base, &pod.manifest().pod.name).await?);
|
||||||
|
|
||||||
// Snapshot pod-immutable values needed for tool factories so the
|
|
||||||
// mutable worker borrow below doesn't conflict with reads on `pod`.
|
|
||||||
let scope_handle = pod.scope().clone();
|
|
||||||
let pwd_for_tools = pod.pwd().to_path_buf();
|
|
||||||
let spawner_name = pod.manifest().pod.name.clone();
|
let spawner_name = pod.manifest().pod.name.clone();
|
||||||
let spawner_model = pod.manifest().model.clone();
|
|
||||||
let memory_config = pod.manifest().memory.clone();
|
|
||||||
|
|
||||||
// Parent callback socket (this Pod's own parent, used for
|
|
||||||
// `PodEvent` upward reports). `None` for top-level Pods.
|
|
||||||
let self_parent_socket = pod.callback_socket().cloned();
|
let self_parent_socket = pod.callback_socket().cloned();
|
||||||
|
|
||||||
// `SpawnedPodRegistry` is shared between the Pod-orchestration
|
|
||||||
// tools (registered below) and the main loop's `PodEvent`
|
|
||||||
// handler (added later in this function), so hoist its creation
|
|
||||||
// above the worker-borrow block.
|
|
||||||
let spawner_socket = runtime_dir.socket_path();
|
|
||||||
let spawned_registry = SpawnedPodRegistry::new(runtime_dir.clone());
|
let spawned_registry = SpawnedPodRegistry::new(runtime_dir.clone());
|
||||||
|
|
||||||
// Hand the alerter to the Pod so internal operations (compaction,
|
// Hand the alerter to the Pod so internal operations (compaction,
|
||||||
|
|
@ -153,11 +152,7 @@ impl PodController {
|
||||||
// Bash spills long outputs to a per-pod subdir under the runtime
|
// Bash spills long outputs to a per-pod subdir under the runtime
|
||||||
// dir. Push a recursive `allow(Read)` for that path into the
|
// dir. Push a recursive `allow(Read)` for that path into the
|
||||||
// Pod's runtime scope so the agent can `Read` saved files
|
// Pod's runtime scope so the agent can `Read` saved files
|
||||||
// without polluting the workspace. The Pod's SharedScope is the
|
// without polluting the workspace.
|
||||||
// single source of truth — every ScopedFs (builtin tools,
|
|
||||||
// fs_view, compact worker) reads from it, and any future scope
|
|
||||||
// mutation (SpawnPod-style revoke, future GrantScope)
|
|
||||||
// propagates through it.
|
|
||||||
let bash_output_dir = runtime_dir.path().join("bash-output");
|
let bash_output_dir = runtime_dir.path().join("bash-output");
|
||||||
std::fs::create_dir_all(&bash_output_dir).map_err(|e| {
|
std::fs::create_dir_all(&bash_output_dir).map_err(|e| {
|
||||||
std::io::Error::other(format!(
|
std::io::Error::other(format!(
|
||||||
|
|
@ -172,15 +167,97 @@ impl PodController {
|
||||||
}])
|
}])
|
||||||
.map_err(std::io::Error::other)?;
|
.map_err(std::io::Error::other)?;
|
||||||
|
|
||||||
// Stashed during tool registration below so we can attach a
|
// === 2. Worker event bridge wiring ===
|
||||||
// `PodFsView` to the shared state once the latter exists.
|
wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter);
|
||||||
let fs_for_view: tools::ScopedFs;
|
|
||||||
let task_store = pod.task_store();
|
|
||||||
let session_id_for_usage = pod.session_id().to_string();
|
|
||||||
|
|
||||||
let scope_change_sink = pod.scope_change_sink();
|
// === 3. Tool registration (builtin / memory / spawn-orchestration) ===
|
||||||
|
let fs_for_view = register_pod_tools(
|
||||||
|
&mut pod,
|
||||||
|
bash_output_dir,
|
||||||
|
runtime_dir.socket_path(),
|
||||||
|
runtime_base.to_path_buf(),
|
||||||
|
spawned_registry.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
// Register event bridge callbacks on the worker
|
// Materialise pending tool factories so the greeting reflects
|
||||||
|
// the actual registered set instead of a hand-maintained mirror.
|
||||||
|
pod.worker().tool_server_handle().flush_pending();
|
||||||
|
|
||||||
|
// === 4. Initial runtime files + PodSharedState + PodHandle +
|
||||||
|
// SocketServer ===
|
||||||
|
let manifest_toml = toml::to_string_pretty(pod.manifest()).unwrap_or_default();
|
||||||
|
let greeting = build_greeting(&pod);
|
||||||
|
let shared_state = Arc::new(PodSharedState::new(
|
||||||
|
pod.manifest().pod.name.clone(),
|
||||||
|
pod.session_id(),
|
||||||
|
manifest_toml.clone(),
|
||||||
|
greeting,
|
||||||
|
));
|
||||||
|
shared_state.update_history(pod.worker().history().to_vec());
|
||||||
|
shared_state.set_user_segments(pod.user_segments().to_vec());
|
||||||
|
shared_state.set_fs_view(crate::fs_view::PodFsView::new(fs_for_view));
|
||||||
|
shared_state.set_workflows(
|
||||||
|
pod.workflow_completions()
|
||||||
|
.into_iter()
|
||||||
|
.map(|slug| crate::shared_state::WorkflowCandidate { slug })
|
||||||
|
.collect(),
|
||||||
|
);
|
||||||
|
shared_state.set_knowledge(
|
||||||
|
pod.knowledge_completions()
|
||||||
|
.into_iter()
|
||||||
|
.map(|slug| crate::shared_state::KnowledgeCandidate { slug })
|
||||||
|
.collect(),
|
||||||
|
);
|
||||||
|
runtime_dir.write_manifest(&manifest_toml).await?;
|
||||||
|
runtime_dir.write_status(&shared_state).await?;
|
||||||
|
runtime_dir.write_history(&shared_state).await?;
|
||||||
|
|
||||||
|
let handle = PodHandle {
|
||||||
|
method_tx,
|
||||||
|
event_tx: event_tx.clone(),
|
||||||
|
shared_state: shared_state.clone(),
|
||||||
|
runtime_dir: runtime_dir.clone(),
|
||||||
|
alerter: alerter.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let socket_server = SocketServer::start(&handle).await?;
|
||||||
|
|
||||||
|
// === 5. controller_loop ===
|
||||||
|
// Clone cancel sender and notification buffer before moving pod
|
||||||
|
// into the controller task so the in-flight turn can be reached
|
||||||
|
// via these handles while pod itself is borrowed by drive_turn.
|
||||||
|
let cancel_tx = pod.worker_mut().cancel_sender();
|
||||||
|
let notify_buffer = pod.notify_buffer_handle();
|
||||||
|
|
||||||
|
tokio::spawn(controller_loop(
|
||||||
|
pod,
|
||||||
|
method_rx,
|
||||||
|
event_tx,
|
||||||
|
shared_state,
|
||||||
|
runtime_dir,
|
||||||
|
cancel_tx,
|
||||||
|
notify_buffer,
|
||||||
|
self_parent_socket,
|
||||||
|
spawner_name,
|
||||||
|
spawned_registry,
|
||||||
|
shutdown_tx,
|
||||||
|
socket_server,
|
||||||
|
));
|
||||||
|
|
||||||
|
Ok((handle, shutdown_rx))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wire the per-event broadcast bridges on the Pod's Worker. Each callback
|
||||||
|
/// re-publishes a worker-level signal as a `protocol::Event` on `event_tx`
|
||||||
|
/// so subscribers (TUI, socket clients) get a single typed stream.
|
||||||
|
fn wire_event_bridges_on_worker<C, St>(
|
||||||
|
pod: &mut Pod<C, St>,
|
||||||
|
event_tx: &broadcast::Sender<Event>,
|
||||||
|
alerter: &Alerter,
|
||||||
|
) where
|
||||||
|
C: LlmClient + Clone + 'static,
|
||||||
|
St: Store + Clone + 'static,
|
||||||
{
|
{
|
||||||
let worker = pod.worker_mut();
|
let worker = pod.worker_mut();
|
||||||
|
|
||||||
|
|
@ -215,9 +292,8 @@ impl PodController {
|
||||||
|
|
||||||
let tx = event_tx.clone();
|
let tx = event_tx.clone();
|
||||||
worker.on_thinking_block(move |block| {
|
worker.on_thinking_block(move |block| {
|
||||||
// Start fires unconditionally so the TUI can show
|
// Start fires unconditionally so the TUI can show "Thinking..."
|
||||||
// "Thinking..." even when the provider doesn't emit
|
// even when the provider doesn't emit plaintext deltas.
|
||||||
// plaintext deltas.
|
|
||||||
let _ = tx.send(Event::ThinkingStart);
|
let _ = tx.send(Event::ThinkingStart);
|
||||||
let tx_d = tx.clone();
|
let tx_d = tx.clone();
|
||||||
block.on_delta(move |text| {
|
block.on_delta(move |text| {
|
||||||
|
|
@ -296,48 +372,66 @@ impl PodController {
|
||||||
let _ = tx.send(Event::SystemMessage { item: value });
|
let _ = tx.send(Event::SystemMessage { item: value });
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Register the builtin file-manipulation tools (Read / Write /
|
/// Register the builtin file-manipulation tools, optional memory tools,
|
||||||
// Edit / Glob / Grep / Bash). `ScopedFs` carries the pod-
|
/// and the Pod-orchestration tools (SpawnPod + comm) on the Pod's
|
||||||
// lifetime scope/pwd; `Tracker` is session-scoped — a fresh
|
/// Worker. Returns the `ScopedFs` clone used to attach a `PodFsView` to
|
||||||
// instance per controller spawn ensures state from a previous
|
/// the shared state.
|
||||||
// process lifetime cannot be reused after a resume. The tracker
|
fn register_pod_tools<C, St>(
|
||||||
// is also handed to the Pod itself so Pod-level operations (e.g.
|
pod: &mut Pod<C, St>,
|
||||||
// context compaction) can ask which files the agent has been
|
bash_output_dir: PathBuf,
|
||||||
// touching.
|
spawner_socket: PathBuf,
|
||||||
//
|
runtime_base: PathBuf,
|
||||||
// The Pod's SharedScope (already augmented with the
|
spawned_registry: Arc<SpawnedPodRegistry>,
|
||||||
// bash-output Read rule above) is the single source of
|
) -> tools::ScopedFs
|
||||||
// truth — every ScopedFs (builtin tools, fs_view, compact
|
where
|
||||||
// worker) reads from it, and any future scope mutation
|
C: LlmClient + Clone + 'static,
|
||||||
// (SpawnPod-style revoke, future GrantScope) propagates
|
St: Store + Clone + 'static,
|
||||||
// through it.
|
{
|
||||||
let fs =
|
// Pod-immutable snapshots taken before the mutable worker borrow
|
||||||
tools::ScopedFs::with_shared_scope(scope_handle.clone(), pwd_for_tools.clone());
|
// below so the worker borrow doesn't conflict with reads on `pod`.
|
||||||
|
let scope_handle = pod.scope().clone();
|
||||||
|
let pwd = pod.pwd().to_path_buf();
|
||||||
|
let task_store = pod.task_store();
|
||||||
|
let session_id_for_usage = pod.session_id().to_string();
|
||||||
|
let scope_change_sink = pod.scope_change_sink();
|
||||||
|
let memory_config = pod.manifest().memory.clone();
|
||||||
|
let spawner_name = pod.manifest().pod.name.clone();
|
||||||
|
let spawner_model = pod.manifest().model.clone();
|
||||||
|
let self_parent_socket = pod.callback_socket().cloned();
|
||||||
|
|
||||||
|
let worker = pod.worker_mut();
|
||||||
|
|
||||||
|
// The Pod's SharedScope (already augmented with the bash-output
|
||||||
|
// Read rule by the caller) is the single source of truth — every
|
||||||
|
// ScopedFs (builtin tools, fs_view, compact worker) reads from it,
|
||||||
|
// and any future scope mutation (SpawnPod-style revoke, future
|
||||||
|
// GrantScope) propagates through it.
|
||||||
|
let fs = tools::ScopedFs::with_shared_scope(scope_handle.clone(), pwd.clone());
|
||||||
let tracker = tools::Tracker::new();
|
let tracker = tools::Tracker::new();
|
||||||
// The same ScopedFs also powers the IPC `ListCompletions`
|
// Same ScopedFs also powers the IPC `ListCompletions` query — keep
|
||||||
// query — keep a clone for the FS view we attach below,
|
// a clone for the FS view we attach below, since the tools consume
|
||||||
// since the tools consume `fs` itself.
|
// `fs` itself.
|
||||||
fs_for_view = fs.clone();
|
let fs_for_view = fs.clone();
|
||||||
worker.register_tools(tools::builtin_tools(
|
worker.register_tools(tools::builtin_tools(
|
||||||
fs,
|
fs,
|
||||||
tracker.clone(),
|
tracker.clone(),
|
||||||
task_store.clone(),
|
task_store,
|
||||||
bash_output_dir,
|
bash_output_dir,
|
||||||
));
|
));
|
||||||
|
|
||||||
// Memory subsystem opt-in. When `[memory]` is present in
|
// Memory subsystem opt-in. When `[memory]` is present in the
|
||||||
// the manifest, register the memory-specific Read/Write/Edit
|
// manifest, register the memory-specific Read/Write/Edit tools that
|
||||||
// tools that target `<workspace>/memory/` and
|
// target `<workspace>/memory/` and `<workspace>/knowledge/` with
|
||||||
// `<workspace>/knowledge/` with their built-in linter. The
|
// their built-in linter. Companion deny rules on the generic CRUD
|
||||||
// companion deny rules on the generic CRUD scope were
|
// scope were already applied during `Pod::from_manifest`.
|
||||||
// already applied during `Pod::from_manifest`.
|
|
||||||
if let Some(mem) = memory_config.as_ref() {
|
if let Some(mem) = memory_config.as_ref() {
|
||||||
let layout = memory::WorkspaceLayout::resolve(mem, &pwd_for_tools);
|
let layout = memory::WorkspaceLayout::resolve(mem, &pwd);
|
||||||
let query_cfg = memory::tool::QueryConfig::from(mem);
|
let query_cfg = memory::tool::QueryConfig::from(mem);
|
||||||
worker.register_tool(memory::tool::read_tool_with_usage(
|
worker.register_tool(memory::tool::read_tool_with_usage(
|
||||||
layout.clone(),
|
layout.clone(),
|
||||||
session_id_for_usage.clone(),
|
session_id_for_usage,
|
||||||
));
|
));
|
||||||
worker.register_tool(memory::tool::write_tool(layout.clone()));
|
worker.register_tool(memory::tool::write_tool(layout.clone()));
|
||||||
worker.register_tool(memory::tool::edit_tool(layout.clone()));
|
worker.register_tool(memory::tool::edit_tool(layout.clone()));
|
||||||
|
|
@ -345,133 +439,67 @@ impl PodController {
|
||||||
worker.register_tool(memory::tool::knowledge_query_tool(layout, query_cfg));
|
worker.register_tool(memory::tool::knowledge_query_tool(layout, query_cfg));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pod-orchestration tools (SpawnPod + the four comm tools)
|
// Pod-orchestration tools (SpawnPod + the four comm tools) share
|
||||||
// share the Pod-scoped `SpawnedPodRegistry` hoisted above
|
// the Pod-scoped `SpawnedPodRegistry` (also consumed by the main
|
||||||
// (also consumed by the main loop's `PodEvent` handler).
|
// loop's `PodEvent` handler).
|
||||||
worker.register_tool(spawn_pod_tool(
|
worker.register_tool(spawn_pod_tool(
|
||||||
spawner_name.clone(),
|
spawner_name,
|
||||||
spawner_socket.clone(),
|
spawner_socket,
|
||||||
runtime_base.to_path_buf(),
|
runtime_base,
|
||||||
pwd_for_tools,
|
pwd,
|
||||||
spawned_registry.clone(),
|
spawned_registry.clone(),
|
||||||
self_parent_socket.clone(),
|
self_parent_socket,
|
||||||
spawner_model.clone(),
|
spawner_model,
|
||||||
scope_handle.clone(),
|
scope_handle,
|
||||||
scope_change_sink.clone(),
|
scope_change_sink,
|
||||||
));
|
));
|
||||||
worker.register_tool(send_to_pod_tool(spawned_registry.clone()));
|
worker.register_tool(send_to_pod_tool(spawned_registry.clone()));
|
||||||
worker.register_tool(read_pod_output_tool(spawned_registry.clone()));
|
worker.register_tool(read_pod_output_tool(spawned_registry.clone()));
|
||||||
worker.register_tool(stop_pod_tool(spawned_registry.clone()));
|
worker.register_tool(stop_pod_tool(spawned_registry.clone()));
|
||||||
worker.register_tool(list_pods_tool(spawned_registry.clone()));
|
worker.register_tool(list_pods_tool(spawned_registry));
|
||||||
pod.attach_tracker(tracker);
|
pod.attach_tracker(tracker);
|
||||||
|
fs_for_view
|
||||||
}
|
}
|
||||||
|
|
||||||
// Materialise pending tool factories so the greeting reflects
|
/// Idle/Paused event loop. Each iteration either fires a staged
|
||||||
// the actual registered set instead of a hand-maintained mirror.
|
/// `PendingRun` (delegating to [`drive_turn`] for the Running phase) or
|
||||||
pod.worker().tool_server_handle().flush_pending();
|
/// waits for the next `Method`. Method handlers stop at "update state +
|
||||||
|
/// stage `pending`"; the loop's top-of-iteration block owns the
|
||||||
|
/// status-flip → run → finish sequence so it lives in exactly one
|
||||||
|
/// place.
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
async fn controller_loop<C, St>(
|
||||||
|
mut pod: Pod<C, St>,
|
||||||
|
mut method_rx: mpsc::Receiver<Method>,
|
||||||
|
event_tx: broadcast::Sender<Event>,
|
||||||
|
shared_state: Arc<PodSharedState>,
|
||||||
|
runtime_dir: Arc<RuntimeDir>,
|
||||||
|
cancel_tx: mpsc::Sender<()>,
|
||||||
|
notify_buffer: NotifyBuffer,
|
||||||
|
self_parent_socket: Option<PathBuf>,
|
||||||
|
spawner_name: String,
|
||||||
|
spawned_registry: Arc<SpawnedPodRegistry>,
|
||||||
|
shutdown_tx: oneshot::Sender<()>,
|
||||||
|
socket_server: SocketServer,
|
||||||
|
) where
|
||||||
|
C: LlmClient + Clone + 'static,
|
||||||
|
St: Store + Clone + 'static,
|
||||||
|
{
|
||||||
|
// Hold socket server alive for the lifetime of the controller task.
|
||||||
|
let _socket_server = socket_server;
|
||||||
|
|
||||||
// Greeting + initial runtime files now that the tool list is final.
|
let mut pending: Option<PendingRun> = None;
|
||||||
let manifest_toml = toml::to_string_pretty(pod.manifest()).unwrap_or_default();
|
|
||||||
let greeting = build_greeting(&pod);
|
|
||||||
let shared_state = Arc::new(PodSharedState::new(
|
|
||||||
pod.manifest().pod.name.clone(),
|
|
||||||
pod.session_id(),
|
|
||||||
manifest_toml.clone(),
|
|
||||||
greeting,
|
|
||||||
));
|
|
||||||
shared_state.update_history(pod.worker().history().to_vec());
|
|
||||||
shared_state.set_user_segments(pod.user_segments().to_vec());
|
|
||||||
shared_state.set_fs_view(crate::fs_view::PodFsView::new(fs_for_view));
|
|
||||||
shared_state.set_workflows(
|
|
||||||
pod.workflow_completions()
|
|
||||||
.into_iter()
|
|
||||||
.map(|slug| crate::shared_state::WorkflowCandidate { slug })
|
|
||||||
.collect(),
|
|
||||||
);
|
|
||||||
shared_state.set_knowledge(
|
|
||||||
pod.knowledge_completions()
|
|
||||||
.into_iter()
|
|
||||||
.map(|slug| crate::shared_state::KnowledgeCandidate { slug })
|
|
||||||
.collect(),
|
|
||||||
);
|
|
||||||
runtime_dir.write_manifest(&manifest_toml).await?;
|
|
||||||
runtime_dir.write_status(&shared_state).await?;
|
|
||||||
runtime_dir.write_history(&shared_state).await?;
|
|
||||||
|
|
||||||
let handle = PodHandle {
|
|
||||||
method_tx,
|
|
||||||
event_tx: event_tx.clone(),
|
|
||||||
shared_state: shared_state.clone(),
|
|
||||||
runtime_dir: runtime_dir.clone(),
|
|
||||||
alerter: alerter.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Start socket server (lives as a background task, cleaned up on
|
|
||||||
// drop via RuntimeDir). Kept alive by moving it into the
|
|
||||||
// controller task so it drops when that task ends.
|
|
||||||
let _socket_server = SocketServer::start(&handle).await?;
|
|
||||||
|
|
||||||
// Clone cancel sender and notification buffer before moving pod
|
|
||||||
// into the controller task so the main loop can route
|
|
||||||
// `Method::Notify` into the buffer even while `pod` is held by
|
|
||||||
// an in-flight `run_for_notification` / `run` future.
|
|
||||||
let cancel_tx = pod.worker_mut().cancel_sender();
|
|
||||||
let notify_buffer = pod.notify_buffer_handle();
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
// Hold socket server alive for the lifetime of the controller task
|
|
||||||
let _socket_server = _socket_server;
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let method = match method_rx.recv().await {
|
// Top-of-iteration: if an event handler staged a run, fire it
|
||||||
Some(m) => m,
|
// here so the status flip → drive_turn → finish sequence lives
|
||||||
None => break,
|
// in one place, regardless of which Method caused it.
|
||||||
};
|
if let Some(run) = pending.take() {
|
||||||
|
set_controller_status(&shared_state, &runtime_dir, &event_tx, PodStatus::Running).await;
|
||||||
match method {
|
let (new_status, shutdown) = match run {
|
||||||
Method::Run { input } => {
|
PendingRun::Run(input) => {
|
||||||
let status_before = shared_state.get_status();
|
drive_turn(
|
||||||
if status_before == PodStatus::Running {
|
pod.run(input),
|
||||||
let _ = event_tx.send(Event::Error {
|
|
||||||
code: ErrorCode::AlreadyRunning,
|
|
||||||
message: "Pod is already executing a turn".into(),
|
|
||||||
});
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let was_paused = status_before == PodStatus::Paused;
|
|
||||||
if let Err(e) = pod.validate_workflow_invocations(&input) {
|
|
||||||
let _ = event_tx.send(Event::Error {
|
|
||||||
code: ErrorCode::InvalidRequest,
|
|
||||||
message: e.to_string(),
|
|
||||||
});
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// Broadcast the accepted user message so every
|
|
||||||
// subscriber (including the submitter) can
|
|
||||||
// render the turn header + user line from a
|
|
||||||
// single source of truth. shared_state's
|
|
||||||
// `user_segments` is re-synced from `pod` after
|
|
||||||
// the run completes, so we don't push here.
|
|
||||||
let _ = event_tx.send(Event::UserMessage {
|
|
||||||
segments: input.clone(),
|
|
||||||
});
|
|
||||||
set_controller_status(
|
|
||||||
&shared_state,
|
|
||||||
&runtime_dir,
|
|
||||||
&event_tx,
|
|
||||||
PodStatus::Running,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let run_future = async {
|
|
||||||
if was_paused {
|
|
||||||
pod.interrupt_and_run(input).await
|
|
||||||
} else {
|
|
||||||
pod.run(input).await
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let (new_status, shutdown) = run_with_cancel_support(
|
|
||||||
run_future,
|
|
||||||
&mut method_rx,
|
&mut method_rx,
|
||||||
&event_tx,
|
&event_tx,
|
||||||
&cancel_tx,
|
&cancel_tx,
|
||||||
|
|
@ -481,47 +509,24 @@ impl PodController {
|
||||||
&spawner_name,
|
&spawner_name,
|
||||||
&spawned_registry,
|
&spawned_registry,
|
||||||
)
|
)
|
||||||
.await;
|
.await
|
||||||
|
}
|
||||||
finish_controller_run(
|
PendingRun::InterruptAndRun(input) => {
|
||||||
&mut pod,
|
drive_turn(
|
||||||
&shared_state,
|
pod.interrupt_and_run(input),
|
||||||
&runtime_dir,
|
&mut method_rx,
|
||||||
&event_tx,
|
&event_tx,
|
||||||
new_status,
|
&cancel_tx,
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
if shutdown {
|
|
||||||
let _ = event_tx.send(Event::Shutdown);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Method::Notify { message } => {
|
|
||||||
let _ = event_tx.send(Event::Notify {
|
|
||||||
message: message.clone(),
|
|
||||||
});
|
|
||||||
pod.push_notify(message);
|
|
||||||
let status = shared_state.get_status();
|
|
||||||
if status != PodStatus::Idle {
|
|
||||||
// RUNNING / Paused: the buffer push is the
|
|
||||||
// entire operation; an in-flight turn (or the
|
|
||||||
// next Resume/Run) will drain the buffer
|
|
||||||
// at its next pre_llm_request.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// IDLE: auto-start a turn so the LLM sees the
|
|
||||||
// buffered notification(s) without a human Run.
|
|
||||||
set_controller_status(
|
|
||||||
&shared_state,
|
&shared_state,
|
||||||
&runtime_dir,
|
¬ify_buffer,
|
||||||
&event_tx,
|
self_parent_socket.as_ref(),
|
||||||
PodStatus::Running,
|
&spawner_name,
|
||||||
|
&spawned_registry,
|
||||||
)
|
)
|
||||||
.await;
|
.await
|
||||||
|
}
|
||||||
let (new_status, shutdown) = run_with_cancel_support(
|
PendingRun::RunForNotification => {
|
||||||
|
drive_turn(
|
||||||
pod.run_for_notification(),
|
pod.run_for_notification(),
|
||||||
&mut method_rx,
|
&mut method_rx,
|
||||||
&event_tx,
|
&event_tx,
|
||||||
|
|
@ -532,21 +537,88 @@ impl PodController {
|
||||||
&spawner_name,
|
&spawner_name,
|
||||||
&spawned_registry,
|
&spawned_registry,
|
||||||
)
|
)
|
||||||
.await;
|
.await
|
||||||
|
}
|
||||||
finish_controller_run(
|
PendingRun::Resume => {
|
||||||
&mut pod,
|
drive_turn(
|
||||||
&shared_state,
|
pod.resume(),
|
||||||
&runtime_dir,
|
&mut method_rx,
|
||||||
&event_tx,
|
&event_tx,
|
||||||
new_status,
|
&cancel_tx,
|
||||||
|
&shared_state,
|
||||||
|
¬ify_buffer,
|
||||||
|
self_parent_socket.as_ref(),
|
||||||
|
&spawner_name,
|
||||||
|
&spawned_registry,
|
||||||
)
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
};
|
||||||
|
finish_controller_run(&mut pod, &shared_state, &runtime_dir, &event_tx, new_status)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
if shutdown {
|
if shutdown {
|
||||||
let _ = event_tx.send(Event::Shutdown);
|
let _ = event_tx.send(Event::Shutdown);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let method = match method_rx.recv().await {
|
||||||
|
Some(m) => m,
|
||||||
|
None => break,
|
||||||
|
};
|
||||||
|
|
||||||
|
match method {
|
||||||
|
Method::Run { input } => {
|
||||||
|
let status_before = shared_state.get_status();
|
||||||
|
if status_before == PodStatus::Running {
|
||||||
|
// Defensive: the inner select! inside drive_turn
|
||||||
|
// already rejects `Run` while a turn is live, so
|
||||||
|
// this branch is only reachable across a race window
|
||||||
|
// around status flips.
|
||||||
|
let _ = event_tx.send(Event::Error {
|
||||||
|
code: ErrorCode::AlreadyRunning,
|
||||||
|
message: "Pod is already executing a turn".into(),
|
||||||
|
});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if let Err(e) = pod.validate_workflow_invocations(&input) {
|
||||||
|
let _ = event_tx.send(Event::Error {
|
||||||
|
code: ErrorCode::InvalidRequest,
|
||||||
|
message: e.to_string(),
|
||||||
|
});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// Broadcast the accepted user message so every
|
||||||
|
// subscriber (including the submitter) can render the
|
||||||
|
// turn header + user line from a single source of
|
||||||
|
// truth. shared_state's `user_segments` is re-synced
|
||||||
|
// from `pod` after the run completes, so we don't push
|
||||||
|
// here.
|
||||||
|
let _ = event_tx.send(Event::UserMessage {
|
||||||
|
segments: input.clone(),
|
||||||
|
});
|
||||||
|
pending = Some(if status_before == PodStatus::Paused {
|
||||||
|
PendingRun::InterruptAndRun(input)
|
||||||
|
} else {
|
||||||
|
PendingRun::Run(input)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Method::Notify { message } => {
|
||||||
|
let _ = event_tx.send(Event::Notify {
|
||||||
|
message: message.clone(),
|
||||||
|
});
|
||||||
|
pod.push_notify(message);
|
||||||
|
// RUNNING / Paused: the buffer push is the entire
|
||||||
|
// operation; an in-flight turn (or the next
|
||||||
|
// Resume/Run) will drain it at its next
|
||||||
|
// pre_llm_request. IDLE: auto-start a turn so the LLM
|
||||||
|
// sees the buffered notification(s) without a human
|
||||||
|
// Run.
|
||||||
|
if shared_state.get_status() == PodStatus::Idle {
|
||||||
|
pending = Some(PendingRun::RunForNotification);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Method::Resume => {
|
Method::Resume => {
|
||||||
|
|
@ -557,40 +629,7 @@ impl PodController {
|
||||||
});
|
});
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
set_controller_status(
|
pending = Some(PendingRun::Resume);
|
||||||
&shared_state,
|
|
||||||
&runtime_dir,
|
|
||||||
&event_tx,
|
|
||||||
PodStatus::Running,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let (new_status, shutdown) = run_with_cancel_support(
|
|
||||||
pod.resume(),
|
|
||||||
&mut method_rx,
|
|
||||||
&event_tx,
|
|
||||||
&cancel_tx,
|
|
||||||
&shared_state,
|
|
||||||
¬ify_buffer,
|
|
||||||
self_parent_socket.as_ref(),
|
|
||||||
&spawner_name,
|
|
||||||
&spawned_registry,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
finish_controller_run(
|
|
||||||
&mut pod,
|
|
||||||
&shared_state,
|
|
||||||
&runtime_dir,
|
|
||||||
&event_tx,
|
|
||||||
new_status,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
if shutdown {
|
|
||||||
let _ = event_tx.send(Event::Shutdown);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Method::Cancel => {
|
Method::Cancel => {
|
||||||
|
|
@ -602,11 +641,9 @@ impl PodController {
|
||||||
|
|
||||||
Method::Pause => {
|
Method::Pause => {
|
||||||
// Already paused → idempotent no-op. Otherwise the
|
// Already paused → idempotent no-op. Otherwise the
|
||||||
// Pod is Idle (Running turns go through
|
// Pod is Idle (Running turns go through `drive_turn`,
|
||||||
// `run_with_cancel_support`, not this outer match), so
|
// not this outer match), so there is nothing to pause.
|
||||||
// there is nothing to pause.
|
if shared_state.get_status() != PodStatus::Paused {
|
||||||
let status = shared_state.get_status();
|
|
||||||
if status != PodStatus::Paused {
|
|
||||||
let _ = event_tx.send(Event::Error {
|
let _ = event_tx.send(Event::Error {
|
||||||
code: ErrorCode::NotRunning,
|
code: ErrorCode::NotRunning,
|
||||||
message: "Pod is not running".into(),
|
message: "Pod is not running".into(),
|
||||||
|
|
@ -620,18 +657,18 @@ impl PodController {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetHistory / ListCompletions are handled at the socket
|
// GetHistory / ListCompletions are handled at the socket
|
||||||
// layer (direct response). If they somehow reach the
|
// layer (direct response). If they reach the controller,
|
||||||
// controller, ignore them.
|
// ignore them.
|
||||||
Method::GetHistory | Method::ListCompletions { .. } => {}
|
Method::GetHistory | Method::ListCompletions { .. } => {}
|
||||||
|
|
||||||
Method::PodEvent(event) => {
|
Method::PodEvent(event) => {
|
||||||
// Echo the received event to all subscribers so
|
// Echo the received event to all subscribers so every
|
||||||
// every client sees the input that drove any
|
// client sees the input that drove any following
|
||||||
// following auto-kicked turn.
|
// auto-kicked turn.
|
||||||
let _ = event_tx.send(Event::PodEvent(event.clone()));
|
let _ = event_tx.send(Event::PodEvent(event.clone()));
|
||||||
// (1) system side effects — idempotent and
|
// (1) system side effects — idempotent and tolerant of
|
||||||
// tolerant of out-of-order delivery (e.g.
|
// out-of-order delivery (e.g. `TurnEnded` arriving
|
||||||
// `TurnEnded` arriving after `ShutDown`).
|
// after `ShutDown`).
|
||||||
crate::ipc::event::apply_event_side_effects(
|
crate::ipc::event::apply_event_side_effects(
|
||||||
&event,
|
&event,
|
||||||
&spawned_registry,
|
&spawned_registry,
|
||||||
|
|
@ -639,50 +676,16 @@ impl PodController {
|
||||||
&self_parent_socket,
|
&self_parent_socket,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
// (2) render a one-line summary and push it
|
// (2) render a one-line summary and push it into the
|
||||||
// into the notification buffer; the next LLM
|
// notification buffer; the next LLM request will
|
||||||
// request will inject it as a system message
|
// inject it as a system message via
|
||||||
// via `PodInterceptor::pre_llm_request`.
|
// `PodInterceptor::pre_llm_request`.
|
||||||
let text = crate::ipc::event::render_event(&event);
|
pod.push_notify(crate::ipc::event::render_event(&event));
|
||||||
pod.push_notify(text);
|
|
||||||
// Auto-kick a turn if the Pod is idle so the
|
// Auto-kick a turn if the Pod is idle so the
|
||||||
// notification is not stranded. Matches the
|
// notification is not stranded. Matches the
|
||||||
// `Method::Notify` idle path.
|
// `Method::Notify` idle path.
|
||||||
if shared_state.get_status() == PodStatus::Idle {
|
if shared_state.get_status() == PodStatus::Idle {
|
||||||
set_controller_status(
|
pending = Some(PendingRun::RunForNotification);
|
||||||
&shared_state,
|
|
||||||
&runtime_dir,
|
|
||||||
&event_tx,
|
|
||||||
PodStatus::Running,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let (new_status, shutdown) = run_with_cancel_support(
|
|
||||||
pod.run_for_notification(),
|
|
||||||
&mut method_rx,
|
|
||||||
&event_tx,
|
|
||||||
&cancel_tx,
|
|
||||||
&shared_state,
|
|
||||||
¬ify_buffer,
|
|
||||||
self_parent_socket.as_ref(),
|
|
||||||
&spawner_name,
|
|
||||||
&spawned_registry,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
finish_controller_run(
|
|
||||||
&mut pod,
|
|
||||||
&shared_state,
|
|
||||||
&runtime_dir,
|
|
||||||
&event_tx,
|
|
||||||
new_status,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
if shutdown {
|
|
||||||
let _ = event_tx.send(Event::Shutdown);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -693,12 +696,11 @@ impl PodController {
|
||||||
// staging writes and consolidation cleanups are not abandoned.
|
// staging writes and consolidation cleanups are not abandoned.
|
||||||
pod.wait_for_memory_jobs().await;
|
pod.wait_for_memory_jobs().await;
|
||||||
|
|
||||||
// Report upward that this Pod is stopping before the
|
// Report upward that this Pod is stopping before the controller
|
||||||
// controller task exits. Awaited (not fire-and-forget):
|
// task exits. Awaited (not fire-and-forget): after `shutdown_tx.send`
|
||||||
// after `shutdown_tx.send` the process may exit quickly,
|
// the process may exit quickly, and a spawned task would be killed
|
||||||
// and a spawned task would be killed mid-send. The
|
// mid-send. The `connect_and_send` helper enforces a 5 s timeout so
|
||||||
// `connect_and_send` helper enforces a 5 s timeout so a
|
// a stuck parent cannot block process exit indefinitely.
|
||||||
// stuck parent cannot block process exit indefinitely.
|
|
||||||
if let Some(parent) = self_parent_socket.as_ref() {
|
if let Some(parent) = self_parent_socket.as_ref() {
|
||||||
if let Err(e) = crate::ipc::event::send_pod_event(
|
if let Err(e) = crate::ipc::event::send_pod_event(
|
||||||
parent,
|
parent,
|
||||||
|
|
@ -713,29 +715,26 @@ impl PodController {
|
||||||
}
|
}
|
||||||
|
|
||||||
let _ = shutdown_tx.send(());
|
let _ = shutdown_tx.send(());
|
||||||
});
|
|
||||||
|
|
||||||
Ok((handle, shutdown_rx))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Runs a Pod future while concurrently processing incoming methods.
|
/// Drives a Pod future (one in-flight turn) while concurrently
|
||||||
///
|
/// processing incoming methods through an inner select! arm. Returns
|
||||||
/// Returns `(final_status, shutdown_requested)`.
|
/// `(final_status, shutdown_requested)`.
|
||||||
///
|
///
|
||||||
/// `parent_socket` / `self_name` drive upward `PodEvent` reports
|
/// `parent_socket` / `self_name` drive upward `PodEvent` reports
|
||||||
/// (`TurnEnded` on a clean Finished, `Errored` on a worker failure).
|
/// (`TurnEnded` on a clean Finished, `Errored` on a worker failure).
|
||||||
/// `None` parent skips the send (top-level Pod). Transient method
|
/// `None` parent skips the send (top-level Pod). Transient method
|
||||||
/// rejections such as `AlreadyRunning` are intentionally NOT reported
|
/// rejections such as `AlreadyRunning` are intentionally NOT reported
|
||||||
/// as `Errored` — only the worker-execution `Err` branch below fires.
|
/// as `Errored` — only the worker-execution `Err` branch below fires.
|
||||||
async fn run_with_cancel_support<F>(
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
async fn drive_turn<F>(
|
||||||
pod_future: F,
|
pod_future: F,
|
||||||
method_rx: &mut mpsc::Receiver<Method>,
|
method_rx: &mut mpsc::Receiver<Method>,
|
||||||
event_tx: &broadcast::Sender<Event>,
|
event_tx: &broadcast::Sender<Event>,
|
||||||
cancel_tx: &mpsc::Sender<()>,
|
cancel_tx: &mpsc::Sender<()>,
|
||||||
shared_state: &Arc<PodSharedState>,
|
shared_state: &Arc<PodSharedState>,
|
||||||
notify_buffer: &NotifyBuffer,
|
notify_buffer: &NotifyBuffer,
|
||||||
parent_socket: Option<&std::path::PathBuf>,
|
parent_socket: Option<&PathBuf>,
|
||||||
self_name: &str,
|
self_name: &str,
|
||||||
spawned_registry: &Arc<SpawnedPodRegistry>,
|
spawned_registry: &Arc<SpawnedPodRegistry>,
|
||||||
) -> (PodStatus, bool)
|
) -> (PodStatus, bool)
|
||||||
|
|
|
||||||
|
|
@ -1,65 +0,0 @@
|
||||||
# Pod: イベントハンドラからターン起動を分離する
|
|
||||||
|
|
||||||
## 背景
|
|
||||||
|
|
||||||
`crates/pod/src/controller.rs` の controller タスクは、 outer loop の Method ハンドラ内に **イベント処理とターン起動を同居** させている。
|
|
||||||
|
|
||||||
該当する箇所:
|
|
||||||
|
|
||||||
- `Method::Run` → outer arm 内で `run_with_cancel_support(pod.run(...)).await` と `finish_controller_run.await` を呼ぶ
|
|
||||||
- `Method::Notify` (Idle) → outer arm 内で `set_controller_status(Running).await` のあと `run_with_cancel_support(pod.run_for_notification()).await` / `finish_controller_run.await`
|
|
||||||
- `Method::PodEvent` (Idle) → 同上 (副作用処理 `apply_event_side_effects.await` も同じ arm 内)
|
|
||||||
- `Method::Resume` → 同上
|
|
||||||
|
|
||||||
ハンドラ自身が「ターンを丸ごと await する長時間処理」 を抱えているため、 以下が起きる:
|
|
||||||
|
|
||||||
- 同じターン起動コード (`set_controller_status` → `run_with_cancel_support` → `finish_controller_run`) が 4 箇所に重複する
|
|
||||||
- ターン起動の発火元 (Run / Notify / PodEvent / Resume) と起動本体が分離されていないため、 起動条件の変更が複数 arm に波及する
|
|
||||||
- ターン起動に必要な前処理 (history への user message append / NotifyBuffer への push / status 遷移) も各 arm に分散している
|
|
||||||
|
|
||||||
ターン起動を outer loop の周回トップに集約し、 各イベントハンドラは「次のターンに渡す入力をその場で確定 (history append / NotifyBuffer push) + `needs_run` フラグセット」 までに留めることで、 起動コードの重複と入力経路の分散をなくす。
|
|
||||||
|
|
||||||
加えて、 `PodController::spawn` 自体が約 615 行に膨らんでおり、「組み立て (channel/runtime_dir/Worker hook 配線/ツール登録/PodHandle 構築)」と「実行ループ (controller タスク)」が単一関数に同居している。 outer loop を上記の形に書き直すタイミングで、 spawn の構造分解も同時に行う方が、 重複コードの統合先が見やすくなる。
|
|
||||||
|
|
||||||
なお、 別途観測されている「auto-kick されたターンの内部で controller がデッドロックする現象」 はこの整理では解決しない (inner select! のアーム内 await が pending する経路は構造上残る)。 根本原因の特定は別チケットの対象。
|
|
||||||
|
|
||||||
## 要件
|
|
||||||
|
|
||||||
- イベントハンドラは **「状態更新と `needs_run` フラグ立て」 まで** にとどめる。ハンドラ内で `run_with_cancel_support.await` / `finish_controller_run.await` を呼ばない
|
|
||||||
- outer loop の各周回はまず `needs_run` を評価し、 立っていればターン起動 (`run_with_cancel_support` → `finish_controller_run`) を実行してフラグを降ろす。 立っていなければ `method_rx.recv().await` で次の Method を待つ
|
|
||||||
- 既存の inner select! によるターン中の Method 並行受信は維持する。 ターン本体の借用構造 (`&mut Pod` を Worker が抱える) も変更しない
|
|
||||||
- `needs_run` を立てる契機は最低限以下を含める (= 現状 auto-kick している経路):
|
|
||||||
- `Method::Run`
|
|
||||||
- `Method::Resume`
|
|
||||||
- `Method::Notify` (Idle 時のみ)
|
|
||||||
- `Method::PodEvent` (Idle 時のみ)
|
|
||||||
- 「次のターンの起動意図と入力」 は `needs_run` を起動意図を持つ enum として表現する:
|
|
||||||
- `Run { input }` / `InterruptAndRun { input }` / `RunForNotification` / `Resume`
|
|
||||||
- `Method::Notify` / `Method::PodEvent` の Idle 経路は NotifyBuffer に push したうえで `RunForNotification` を立てる (現状の `pod.run_for_notification()` が NotifyBuffer から自動取得する挙動に乗るため、 enum に入力を載せる必要はない)
|
|
||||||
- `Worker::run` / `Pod::run` の入力受け取り API には触らない (interceptor の `on_prompt_submit` cancel / 書き換え / extras ordering の invariant に踏み込まない)
|
|
||||||
- Pause / Shutdown / Cancel はハンドラ内で完結する (フラグ化しない、 既存通り即時処理)
|
|
||||||
- Running 中に来た `Method::Notify` / `Method::PodEvent` の挙動 (NotifyBuffer に積むだけ、 副作用は実行) は変えない
|
|
||||||
- 上記改修に合わせて `PodController::spawn` を以下の責務単位に分解する:
|
|
||||||
- 初期化 (channel 群 / `RuntimeDir` / pod-immutable snapshot / `SpawnedPodRegistry` / `alerter` 装着 / bash-output scope)
|
|
||||||
- Worker への event bridge コールバック配線 (`on_turn_*` / `on_*_block` / `on_tool_result` / `on_usage` / `on_warning` / `on_history_append` 等)
|
|
||||||
- ツール登録 (builtin / memory / spawn orchestration)
|
|
||||||
- 初期ファイル書き出し + `PodSharedState` / `PodHandle` 構築 + `SocketServer` 起動
|
|
||||||
- `controller_loop` — Idle/Paused 状態の event loop + 後処理 (現 `tokio::spawn(async move { loop { ... } })` の本体)
|
|
||||||
- `drive_turn` — Running 状態の event loop。 現 `run_with_cancel_support` を改名し、 `controller_loop` と同格の関数として並べる (「cancel support」 という実装詳細名から役割ベースの名前に改める)
|
|
||||||
- 分解後の `spawn` はこの順序で各責務を呼び出す薄いフローになる。 Idle/Paused 側 (`controller_loop`) と Running 側 (`drive_turn`) の event loop が同格の関数として並ぶ形に揃える
|
|
||||||
|
|
||||||
## 完了条件
|
|
||||||
|
|
||||||
- `Method::Run` / `Method::Resume` / `Method::Notify(Idle)` / `Method::PodEvent(Idle)` のいずれも、 ターン起動が outer loop の周回トップ 1 箇所に集約されており、 ハンドラ側にはターン起動コードが残っていない
|
|
||||||
- 各イベントハンドラの async body 内に「ターン丸ごと」 や `finish_controller_run` 等の長時間 await が無い
|
|
||||||
- 既存挙動が変わらない (どの Method で auto-kick されるか、 ターン中の Cancel / Pause / Shutdown が効くか、 NotifyBuffer に積まれた内容が次ターンで反映されるか、 ターン起動順序)
|
|
||||||
- `PodController::spawn` が責務単位の関数列を順に呼ぶ薄いフローになっており、 単一関数の中に組み立てと実行ループが同居していない
|
|
||||||
- Idle/Paused 状態の event loop (`controller_loop`) と Running 状態の event loop (`drive_turn`) が同格の関数として並んでおり、 名前が役割を表している (`run_with_cancel_support` の名前は残らない)
|
|
||||||
- `crates/pod/tests/controller_test.rs` の既存テストが通る
|
|
||||||
|
|
||||||
## 範囲外
|
|
||||||
|
|
||||||
- 観測されたデッドロックの根本原因特定 (別チケット予定)
|
|
||||||
- `Pod` 構造体の `&mut self` borrow を分解して outer loop を完全 dispatcher 化する大改修 (将来検討)
|
|
||||||
- Cancel / Shutdown の専用チャネル化 (controller が固まっている時には別経路でも効かないため、 メリット薄と判断して見送り)
|
|
||||||
- `Worker::run` / `Pod::run` の入力なし API 化 (controller の dispatch を bool に圧縮するメリットに対し、 worker interceptor invariant を壊すコストが見合わない)
|
|
||||||
Loading…
Reference in New Issue
Block a user