Compare commits

...

12 Commits

22 changed files with 1939 additions and 188 deletions

View File

@ -4,10 +4,9 @@
- AI maintainer 用 WorkItem / Thread 抽象 → [tickets/maintainer-work-items.md](tickets/maintainer-work-items.md)
- Prompt / Workflow 評価メトリクスと改善 Offer → [tickets/prompt-eval-metrics.md](tickets/prompt-eval-metrics.md)
- Permission: allow-all 既定 policy への整理 → [tickets/permission-default-policy.md](tickets/permission-default-policy.md)
- Pod: 空応答ターン (Submit 後 AI 応答ゼロで Pause/Cancel) を自動巻き戻し → [tickets/pod-empty-turn-rollback.md](tickets/pod-empty-turn-rollback.md)
- Pod: 任意ターンからの Fork複数ターン巻き戻しを汎用化 → [tickets/pod-session-fork.md](tickets/pod-session-fork.md)
- Pod/TUI: 手動 rollback 導線 → [tickets/manual-turn-rollback.md](tickets/manual-turn-rollback.md)
- Pod: Inbound PodEvent ハンドリングの重複を統合 → [tickets/pod-inbound-pod-event-dedup.md](tickets/pod-inbound-pod-event-dedup.md)
- Pod: 過去 Pod の探索と restore ツール → [tickets/pod-discovery-restore-tools.md](tickets/pod-discovery-restore-tools.md)
- llm-worker のエラー耐性
- ストリーム途中失敗時の継続 → [tickets/llm-worker-stream-continuation.md](tickets/llm-worker-stream-continuation.md)
- llm-worker: history append を callback 経由の単一経路に閉じる → [tickets/worker-history-append-contract.md](tickets/worker-history-append-contract.md)
@ -19,7 +18,6 @@
- Run 中の入力キューイング → [tickets/tui-input-queue.md](tickets/tui-input-queue.md)
- ユーザーマニフェストのモデル設定 wizard → [tickets/tui-user-model-setup.md](tickets/tui-user-model-setup.md)
- spawn 失敗時に Pod の stderr が TUI に表示されない → [tickets/tui-spawn-error-surface.md](tickets/tui-spawn-error-surface.md)
- 巻き戻されたターンの入力テキストを編集領域に復元 → [tickets/tui-empty-turn-restore.md](tickets/tui-empty-turn-restore.md)
- メモリ機構
- extract / consolidation 監査ログ → [tickets/memory-audit-log.md](tickets/memory-audit-log.md)
- セッション内 Task ツールの注意機構(無アクティビティで `<system-reminder>` ナッジ) → [tickets/session-todo-reminder.md](tickets/session-todo-reminder.md)
@ -27,3 +25,4 @@
- system-reminder 注入機構の汎用化2件目の利用者が出た時に検討。タグ形式 `<system-reminder>...</system-reminder>` の規約は session-todo-reminder で先行確立。注入された Item は worker.history に append する方針)
- Bashツールがファイル編集に常用されている問題をdesciptionで抑制
- 事前定義したManifestをProfile的に扱い、Orchestrator/Coder/Researcherで別々のモデル/設定を使わせる運用ができるようにする
- 複数のPodのViewを行き来できるUI

View File

@ -62,6 +62,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
PodRunResult::Finished => println!("(finished)"),
PodRunResult::Paused => println!("(paused)"),
PodRunResult::LimitReached => println!("(turn limit reached)"),
PodRunResult::RolledBack => println!("(empty turn rolled back)"),
}
// 5. Extract the assistant's reply from history

View File

@ -1,11 +1,15 @@
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::Ordering;
use llm_worker::WorkerError;
use llm_worker::llm_client::client::LlmClient;
use session_store::{PodMetadataStore, Store};
use tokio::sync::{broadcast, mpsc, oneshot};
use crate::discovery::{
PodDiscovery, attach_or_restore_pod_tool, inspect_pod_tool, list_visible_pods_tool,
};
use crate::ipc::alerter::Alerter;
use crate::ipc::notify_buffer::NotifyBuffer;
use crate::ipc::server::SocketServer;
@ -78,7 +82,7 @@ async fn finish_controller_run<C, St>(
new_status: PodStatus,
) where
C: LlmClient + Clone + 'static,
St: Store + Clone + 'static,
St: Store + PodMetadataStore + Clone + 'static,
{
// history / user_segments are no longer mirrored on PodSharedState —
// clients reconstruct them from `Event::Snapshot` + live
@ -298,8 +302,9 @@ fn wire_event_bridges_on_worker<C, St>(
alerter: &Alerter,
) where
C: LlmClient + Clone + 'static,
St: Store + Clone + 'static,
St: Store + PodMetadataStore + Clone + 'static,
{
let ai_activity = pod.ai_activity_counter();
let worker = pod.worker_mut();
let tx = event_tx.clone();
@ -326,15 +331,22 @@ fn wire_event_bridges_on_worker<C, St>(
});
let tx = event_tx.clone();
let activity = ai_activity.clone();
worker.on_text_block(move |block| {
let tx_d = tx.clone();
let activity_d = activity.clone();
block.on_delta(move |text| {
activity_d.fetch_add(1, Ordering::SeqCst);
let _ = tx_d.send(Event::TextDelta {
text: text.to_owned(),
});
});
let tx_s = tx.clone();
let activity_s = activity.clone();
block.on_stop(move |text| {
if !text.is_empty() {
activity_s.fetch_add(1, Ordering::SeqCst);
}
let _ = tx_s.send(Event::TextDone {
text: text.to_owned(),
});
@ -342,18 +354,26 @@ fn wire_event_bridges_on_worker<C, St>(
});
let tx = event_tx.clone();
let activity = ai_activity.clone();
worker.on_thinking_block(move |block| {
// Start fires unconditionally so the TUI can show "Thinking..."
// even when the provider doesn't emit plaintext deltas.
activity.fetch_add(1, Ordering::SeqCst);
let _ = tx.send(Event::ThinkingStart);
let tx_d = tx.clone();
let activity_d = activity.clone();
block.on_delta(move |text| {
activity_d.fetch_add(1, Ordering::SeqCst);
let _ = tx_d.send(Event::ThinkingDelta {
text: text.to_owned(),
});
});
let tx_s = tx.clone();
let activity_s = activity.clone();
block.on_stop(move |text| {
if !text.is_empty() {
activity_s.fetch_add(1, Ordering::SeqCst);
}
let _ = tx_s.send(Event::ThinkingDone {
text: text.to_owned(),
});
@ -361,21 +381,27 @@ fn wire_event_bridges_on_worker<C, St>(
});
let tx = event_tx.clone();
let activity = ai_activity.clone();
worker.on_tool_use_block(move |start, block| {
activity.fetch_add(1, Ordering::SeqCst);
let _ = tx.send(Event::ToolCallStart {
id: start.id.clone(),
name: start.name.clone(),
});
let id_for_delta = start.id.clone();
let tx_d = tx.clone();
let activity_d = activity.clone();
block.on_delta(move |json| {
activity_d.fetch_add(1, Ordering::SeqCst);
let _ = tx_d.send(Event::ToolCallArgsDelta {
id: id_for_delta.clone(),
json: json.to_owned(),
});
});
let tx_s = tx.clone();
let activity_s = activity.clone();
block.on_stop(move |call| {
activity_s.fetch_add(1, Ordering::SeqCst);
let _ = tx_s.send(Event::ToolCallDone {
id: call.id.clone(),
name: call.name.clone(),
@ -385,7 +411,9 @@ fn wire_event_bridges_on_worker<C, St>(
});
let tx = event_tx.clone();
let activity = ai_activity.clone();
worker.on_tool_result(move |result| {
activity.fetch_add(1, Ordering::SeqCst);
let _ = tx.send(Event::ToolResult {
id: result.tool_use_id.clone(),
summary: result.summary.clone(),
@ -436,7 +464,7 @@ fn register_pod_tools<C, St>(
) -> tools::ScopedFs
where
C: LlmClient + Clone + 'static,
St: Store + Clone + 'static,
St: Store + PodMetadataStore + Clone + 'static,
{
// Pod-immutable snapshots taken before the mutable worker borrow
// below so the worker borrow doesn't conflict with reads on `pod`.
@ -448,6 +476,7 @@ where
let memory_config = pod.manifest().memory.clone();
let spawner_name = pod.manifest().pod.name.clone();
let spawner_model = pod.manifest().model.clone();
let pod_store = pod.store().clone();
let self_parent_socket = pod.callback_socket().cloned();
let worker = pod.worker_mut();
@ -492,10 +521,10 @@ where
// the Pod-scoped `SpawnedPodRegistry` (also consumed by the main
// loop's `PodEvent` handler).
worker.register_tool(spawn_pod_tool(
spawner_name,
spawner_name.clone(),
spawner_socket,
runtime_base,
pwd,
runtime_base.clone(),
pwd.clone(),
spawned_registry.clone(),
self_parent_socket,
spawner_model,
@ -505,7 +534,12 @@ where
worker.register_tool(send_to_pod_tool(spawned_registry.clone()));
worker.register_tool(read_pod_output_tool(spawned_registry.clone()));
worker.register_tool(stop_pod_tool(spawned_registry.clone()));
worker.register_tool(list_pods_tool(spawned_registry));
worker.register_tool(list_pods_tool(spawned_registry.clone()));
let discovery = PodDiscovery::new(pod_store, spawner_name, runtime_base, pwd, spawned_registry);
worker.register_tool(list_visible_pods_tool(discovery.clone()));
worker.register_tool(inspect_pod_tool(discovery.clone()));
worker.register_tool(attach_or_restore_pod_tool(discovery));
pod.attach_tracker(tracker);
fs_for_view
}
@ -532,11 +566,23 @@ async fn controller_loop<C, St>(
socket_server: SocketServer,
) where
C: LlmClient + Clone + 'static,
St: Store + Clone + 'static,
St: Store + PodMetadataStore + Clone + 'static,
{
// Hold socket server alive for the lifetime of the controller task.
let _socket_server = socket_server;
let discovery_runtime_base = runtime_dir
.path()
.parent()
.map(PathBuf::from)
.unwrap_or_else(|| runtime_dir.path().to_path_buf());
let discovery = PodDiscovery::new(
pod.store().clone(),
spawner_name.clone(),
discovery_runtime_base,
pod.pwd().to_path_buf(),
spawned_registry.clone(),
);
let mut pending: Option<PendingRun> = None;
loop {
@ -691,6 +737,66 @@ async fn controller_loop<C, St>(
break;
}
Method::ListVisiblePods => match discovery.list_visible().await {
Ok(pods) => match serde_json::to_value(pods) {
Ok(pods) => {
let _ = event_tx.send(Event::VisiblePods { pods });
}
Err(error) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::Internal,
message: format!("serialize visible pods: {error}"),
});
}
},
Err(error) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::InvalidRequest,
message: error.to_string(),
});
}
},
Method::InspectPod { name } => match discovery.inspect(&name).await {
Ok(pod) => match serde_json::to_value(pod) {
Ok(pod) => {
let _ = event_tx.send(Event::PodInspection { pod });
}
Err(error) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::Internal,
message: format!("serialize pod inspection: {error}"),
});
}
},
Err(error) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::InvalidRequest,
message: error.to_string(),
});
}
},
Method::AttachOrRestorePod { name } => match discovery.attach_or_restore(&name).await {
Ok(result) => match serde_json::to_value(result) {
Ok(result) => {
let _ = event_tx.send(Event::PodAttachRestore { result });
}
Err(error) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::Internal,
message: format!("serialize pod attach/restore result: {error}"),
});
}
},
Err(error) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::InvalidRequest,
message: error.to_string(),
});
}
},
// ListCompletions is handled at the socket layer (direct
// response). If it reaches the controller, ignore it.
Method::ListCompletions { .. } => {}
@ -798,6 +904,7 @@ where
PodRunResult::Finished => (PodStatus::Idle, RunResult::Finished),
PodRunResult::Paused => (PodStatus::Paused, RunResult::Paused),
PodRunResult::LimitReached => (PodStatus::Idle, RunResult::LimitReached),
PodRunResult::RolledBack => (PodStatus::Idle, RunResult::RolledBack),
};
let _ = event_tx.send(Event::RunEnd { result: run_result });
if parent_originated && matches!(run_result, RunResult::Finished) {
@ -865,6 +972,17 @@ where
notify_buffer.push_notify(message);
}
Some(Method::ListCompletions { .. }) => {}
Some(
Method::ListVisiblePods
| Method::InspectPod { .. }
| Method::AttachOrRestorePod { .. },
) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::AlreadyRunning,
message: "Pod discovery requests are only handled while the Pod is idle or paused"
.into(),
});
}
Some(Method::PodEvent(event)) => {
// mpsc is consume-once, so we cannot defer this
// to the next main-loop iteration — drop here

977
crates/pod/src/discovery.rs Normal file
View File

@ -0,0 +1,977 @@
//! Pod-state-backed discovery and restore/attach tools.
//!
//! This surface deliberately does not enumerate every Pod on the host. The
//! listing path starts from the caller's visibility set (the caller itself and
//! Pods it spawned according to durable Pod state) and only then reads each
//! Pod's own state. Name-targeted operations distinguish missing state from
//! state that exists but is outside that visibility set.
use std::collections::BTreeMap;
use std::io;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use protocol::stream::JsonLineReader;
use protocol::{Event, PodStatus};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use session_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, SegmentId, SessionId};
use tokio::net::UnixStream;
use tokio::process::Command;
use crate::runtime::pod_registry;
use crate::spawn::registry::SpawnedPodRegistry;
const PROBE_TIMEOUT: Duration = Duration::from_millis(500);
const RESTORE_START_TIMEOUT: Duration = Duration::from_secs(20);
#[derive(Clone)]
pub struct PodDiscovery<St> {
store: St,
self_pod_name: String,
runtime_base: PathBuf,
cwd: PathBuf,
store_dir: Option<PathBuf>,
spawned_registry: Arc<SpawnedPodRegistry>,
}
impl<St> PodDiscovery<St>
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
pub fn new(
store: St,
self_pod_name: String,
runtime_base: PathBuf,
cwd: PathBuf,
spawned_registry: Arc<SpawnedPodRegistry>,
) -> Self {
let store_dir = store.root_dir();
Self {
store,
self_pod_name,
runtime_base,
cwd,
store_dir,
spawned_registry,
}
}
pub async fn list_visible(&self) -> Result<Vec<VisiblePodItem>, PodDiscoveryError> {
let visibility = self.visibility().await?;
let mut items = Vec::with_capacity(visibility.visible.len());
for pod_name in visibility.visible.keys() {
items.push(
self.build_item_for_visible_name(pod_name, &visibility)
.await,
);
}
Ok(items)
}
pub async fn inspect(&self, pod_name: &str) -> Result<PodDetail, PodDiscoveryError> {
let visibility = self.visibility().await?;
let known_names = self.store.list_names()?;
let state_exists = known_names.iter().any(|n| n == pod_name);
if !state_exists {
return Err(PodDiscoveryError::StateMissing {
pod_name: pod_name.to_string(),
});
}
if !visibility.visible.contains_key(pod_name) {
return Err(PodDiscoveryError::NotVisible {
pod_name: pod_name.to_string(),
});
}
match self.store.read_by_name(pod_name)? {
Some(metadata) => Ok(self.detail_from_metadata(metadata, &visibility).await),
None => Err(PodDiscoveryError::StateMissing {
pod_name: pod_name.to_string(),
}),
}
}
pub async fn attach_or_restore(
&self,
pod_name: &str,
) -> Result<AttachRestoreResult, PodDiscoveryError> {
match self.plan_attach_or_restore(pod_name).await? {
AttachRestorePlan::Attach {
pod_name,
socket_path,
status,
} => Ok(AttachRestoreResult::Attached {
pod_name,
socket_path,
status,
}),
AttachRestorePlan::Restore {
pod_name,
socket_path,
} => {
self.spawn_restore_process(&pod_name, &socket_path).await?;
Ok(AttachRestoreResult::Restored {
pod_name,
socket_path,
})
}
}
}
pub async fn plan_attach_or_restore(
&self,
pod_name: &str,
) -> Result<AttachRestorePlan, PodDiscoveryError> {
let detail = self.inspect(pod_name).await?;
if detail.live.reachable {
return Ok(AttachRestorePlan::Attach {
pod_name: pod_name.to_string(),
socket_path: detail.live.socket_path,
status: detail.live.status,
});
}
let active = detail
.active
.ok_or_else(|| PodDiscoveryError::NotRestorable {
pod_name: pod_name.to_string(),
reason: "pod state has no active session".into(),
})?;
let segment_id = active
.segment_id
.ok_or_else(|| PodDiscoveryError::NotRestorable {
pod_name: pod_name.to_string(),
reason: "pod state has an active session but no active segment yet".into(),
})?;
if let Some(lock) = lookup_segment_lock(segment_id)? {
let lock_live = probe_socket(&lock.socket).await;
return if lock_live.reachable {
Ok(AttachRestorePlan::Attach {
pod_name: lock.pod_name,
socket_path: lock.socket,
status: lock_live.status,
})
} else {
Err(PodDiscoveryError::LockConflict {
pod_name: pod_name.to_string(),
segment_id,
owner_pod: lock.pod_name,
socket_path: lock.socket,
pid: lock.pid,
})
};
}
Ok(AttachRestorePlan::Restore {
pod_name: pod_name.to_string(),
socket_path: self.default_socket_path(pod_name),
})
}
async fn visibility(&self) -> Result<VisibilitySet, PodDiscoveryError> {
let mut visible = BTreeMap::new();
let mut child_sockets = BTreeMap::new();
visible.insert(self.self_pod_name.clone(), VisibilityReason::SelfPod);
// Durable parent -> child state is the primary visibility source.
if let Some(metadata) = self.store.read_by_name(&self.self_pod_name)? {
for child in metadata.spawned_children {
visible
.entry(child.pod_name.clone())
.or_insert(VisibilityReason::SpawnedChild);
child_sockets.insert(child.pod_name, child.socket_path);
}
}
// The live in-memory registry covers just-spawned children even if a
// state write failed after the process became reachable. It is an
// additive visibility hint, not the source of Pod metadata.
for record in self.spawned_registry.list().await {
visible
.entry(record.pod_name.clone())
.or_insert(VisibilityReason::SpawnedChild);
child_sockets.insert(record.pod_name, record.socket_path);
}
Ok(VisibilitySet {
visible,
child_sockets,
})
}
async fn build_item_for_visible_name(
&self,
pod_name: &str,
visibility: &VisibilitySet,
) -> VisiblePodItem {
let visibility_reason = visibility.reason_for(pod_name);
match self.store.read_by_name(pod_name) {
Ok(Some(metadata)) => {
let detail = self.detail_from_metadata(metadata, visibility).await;
VisiblePodItem {
pod_name: pod_name.to_string(),
visibility: visibility_reason,
state: PodStateStatus::Readable,
active: detail.active,
live: detail.live,
restore: detail.restore,
spawned_children: detail.spawned_children,
error: None,
}
}
Ok(None) => VisiblePodItem {
pod_name: pod_name.to_string(),
visibility: visibility_reason,
state: PodStateStatus::Missing,
active: None,
live: self.live_for_name(pod_name, None).await,
restore: RestoreInfo::not_possible("pod state missing"),
spawned_children: SpawnedChildrenSummary::default(),
error: None,
},
Err(error) => VisiblePodItem {
pod_name: pod_name.to_string(),
visibility: visibility_reason,
state: PodStateStatus::Corrupt,
active: None,
live: self.live_for_name(pod_name, None).await,
restore: RestoreInfo::not_possible("pod state is unreadable"),
spawned_children: SpawnedChildrenSummary::default(),
error: Some(error.to_string()),
},
}
}
async fn detail_from_metadata(
&self,
metadata: PodMetadata,
visibility: &VisibilitySet,
) -> PodDetail {
let child_socket = visibility.child_socket_for(&metadata.pod_name);
let live = self
.live_for_name(&metadata.pod_name, child_socket.as_deref())
.await;
let restore = self
.restore_info(&metadata.pod_name, metadata.active.as_ref())
.await;
let spawned_children = summarize_spawned_children(&metadata.spawned_children).await;
PodDetail {
pod_name: metadata.pod_name.clone(),
visibility: visibility.reason_for(&metadata.pod_name),
active: metadata.active.map(ActivePointer::from),
live,
restore,
spawned_children,
}
}
async fn restore_info(
&self,
pod_name: &str,
active: Option<&PodActiveSegmentRef>,
) -> RestoreInfo {
let Some(active) = active else {
return RestoreInfo::not_possible("pod state has no active session");
};
let Some(segment_id) = active.segment_id else {
return RestoreInfo::not_possible("active segment is not known yet");
};
match lookup_segment_lock(segment_id) {
Ok(Some(lock)) => RestoreInfo {
possible: false,
restore_name: Some(pod_name.to_string()),
reason: Some(format!(
"segment is currently locked by `{}` (pid {})",
lock.pod_name, lock.pid
)),
},
Ok(None) => RestoreInfo {
possible: true,
restore_name: Some(pod_name.to_string()),
reason: None,
},
Err(error) => RestoreInfo {
possible: false,
restore_name: Some(pod_name.to_string()),
reason: Some(format!("lock lookup failed: {error}")),
},
}
}
async fn live_for_name(&self, pod_name: &str, socket_override: Option<&Path>) -> LiveInfo {
let socket_path = socket_override
.map(Path::to_path_buf)
.unwrap_or_else(|| self.default_socket_path(pod_name));
probe_socket(&socket_path).await
}
fn default_socket_path(&self, pod_name: &str) -> PathBuf {
self.runtime_base.join(pod_name).join("sock")
}
async fn spawn_restore_process(
&self,
pod_name: &str,
socket_path: &Path,
) -> Result<(), PodDiscoveryError> {
let mut command = Command::new(resolve_pod_command());
command
.arg("--pod")
.arg(pod_name)
.arg("--require-pod-state")
.current_dir(&self.cwd)
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.process_group(0);
if let Some(store_dir) = &self.store_dir {
command.arg("--store").arg(store_dir);
}
let mut child = command.spawn().map_err(PodDiscoveryError::RestoreSpawn)?;
let deadline = tokio::time::Instant::now() + RESTORE_START_TIMEOUT;
loop {
if probe_socket(socket_path).await.reachable {
tokio::spawn(async move {
let _ = child.wait().await;
});
return Ok(());
}
if let Some(status) = child.try_wait().map_err(PodDiscoveryError::RestoreSpawn)? {
return Err(PodDiscoveryError::RestoreExited { status });
}
if tokio::time::Instant::now() >= deadline {
let _ = child.start_kill();
let _ = child.wait().await;
return Err(PodDiscoveryError::RestoreTimeout);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum VisibilityReason {
SelfPod,
SpawnedChild,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum PodStateStatus {
Readable,
Missing,
Corrupt,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ActivePointer {
pub session_id: SessionId,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub segment_id: Option<SegmentId>,
}
impl From<PodActiveSegmentRef> for ActivePointer {
fn from(value: PodActiveSegmentRef) -> Self {
Self {
session_id: value.session_id,
segment_id: value.segment_id,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct LiveInfo {
pub socket_path: PathBuf,
pub reachable: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub status: Option<PodStatus>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RestoreInfo {
pub possible: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub restore_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
}
impl RestoreInfo {
fn not_possible(reason: impl Into<String>) -> Self {
Self {
possible: false,
restore_name: None,
reason: Some(reason.into()),
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct SpawnedChildrenSummary {
pub count: usize,
pub reachable: usize,
pub unreachable: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct VisiblePodItem {
pub pod_name: String,
pub visibility: VisibilityReason,
pub state: PodStateStatus,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active: Option<ActivePointer>,
pub live: LiveInfo,
pub restore: RestoreInfo,
pub spawned_children: SpawnedChildrenSummary,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PodDetail {
pub pod_name: String,
pub visibility: VisibilityReason,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active: Option<ActivePointer>,
pub live: LiveInfo,
pub restore: RestoreInfo,
pub spawned_children: SpawnedChildrenSummary,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "action", rename_all = "snake_case")]
pub enum AttachRestorePlan {
Attach {
pod_name: String,
socket_path: PathBuf,
#[serde(default, skip_serializing_if = "Option::is_none")]
status: Option<PodStatus>,
},
Restore {
pod_name: String,
socket_path: PathBuf,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "action", rename_all = "snake_case")]
pub enum AttachRestoreResult {
Attached {
pod_name: String,
socket_path: PathBuf,
#[serde(default, skip_serializing_if = "Option::is_none")]
status: Option<PodStatus>,
},
Restored {
pod_name: String,
socket_path: PathBuf,
},
}
#[derive(Debug, thiserror::Error)]
pub enum PodDiscoveryError {
#[error("pod state missing for `{pod_name}`")]
StateMissing { pod_name: String },
#[error("pod `{pod_name}` is not visible to this Pod")]
NotVisible { pod_name: String },
#[error("pod `{pod_name}` is not restorable: {reason}")]
NotRestorable { pod_name: String, reason: String },
#[error(
"pod `{pod_name}` segment {segment_id} is locked by `{owner_pod}` pid {pid} at {socket_path}"
)]
LockConflict {
pod_name: String,
segment_id: SegmentId,
owner_pod: String,
socket_path: PathBuf,
pid: u32,
},
#[error("store error: {0}")]
Store(#[from] session_store::StoreError),
#[error("scope lock error: {0}")]
ScopeLock(#[from] pod_registry::ScopeLockError),
#[error("failed to launch restore process: {0}")]
RestoreSpawn(io::Error),
#[error("restore process exited before socket became reachable: {status}")]
RestoreExited { status: std::process::ExitStatus },
#[error("restore process did not become reachable before timeout")]
RestoreTimeout,
}
struct VisibilitySet {
visible: BTreeMap<String, VisibilityReason>,
child_sockets: BTreeMap<String, PathBuf>,
}
impl VisibilitySet {
fn reason_for(&self, pod_name: &str) -> VisibilityReason {
self.visible
.get(pod_name)
.cloned()
.unwrap_or(VisibilityReason::SpawnedChild)
}
fn child_socket_for(&self, pod_name: &str) -> Option<PathBuf> {
self.child_sockets.get(pod_name).cloned()
}
}
async fn summarize_spawned_children(
children: &[session_store::PodSpawnedChild],
) -> SpawnedChildrenSummary {
let mut summary = SpawnedChildrenSummary {
count: children.len(),
..Default::default()
};
for child in children {
if probe_socket(&child.socket_path).await.reachable {
summary.reachable += 1;
} else {
summary.unreachable += 1;
}
}
summary
}
async fn probe_socket(socket_path: &Path) -> LiveInfo {
match tokio::time::timeout(PROBE_TIMEOUT, UnixStream::connect(socket_path)).await {
Ok(Ok(stream)) => {
let (r, _w) = stream.into_split();
let mut reader = JsonLineReader::new(r);
let status = match tokio::time::timeout(PROBE_TIMEOUT, reader.next::<Event>()).await {
Ok(Ok(Some(Event::Snapshot { status, .. }))) => Some(status),
_ => None,
};
LiveInfo {
socket_path: socket_path.to_path_buf(),
reachable: true,
status,
error: None,
}
}
Ok(Err(error)) => LiveInfo {
socket_path: socket_path.to_path_buf(),
reachable: false,
status: None,
error: Some(error.to_string()),
},
Err(_) => LiveInfo {
socket_path: socket_path.to_path_buf(),
reachable: false,
status: None,
error: Some("connect timed out".into()),
},
}
}
fn lookup_segment_lock(
segment_id: SegmentId,
) -> Result<Option<pod_registry::SegmentLockInfo>, pod_registry::ScopeLockError> {
pod_registry::lookup_segment(segment_id)
}
fn resolve_pod_command() -> PathBuf {
if let Ok(cmd) = std::env::var("INSOMNIA_POD_COMMAND")
&& !cmd.is_empty()
{
return PathBuf::from(cmd);
}
PathBuf::from("pod")
}
#[derive(Debug, Deserialize, JsonSchema)]
struct PodNameInput {
/// Pod name to inspect, attach, or restore.
name: String,
}
struct ListVisiblePodsTool<St> {
discovery: PodDiscovery<St>,
}
#[async_trait]
impl<St> Tool for ListVisiblePodsTool<St>
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
async fn execute(&self, _input_json: &str) -> Result<ToolOutput, ToolError> {
let items = self
.discovery
.list_visible()
.await
.map_err(discovery_error_to_tool_error)?;
let summary = format!("{} visible pod(s)", items.len());
Ok(ToolOutput {
summary,
content: Some(json_content(&items)?),
})
}
}
struct InspectPodTool<St> {
discovery: PodDiscovery<St>,
}
#[async_trait]
impl<St> Tool for InspectPodTool<St>
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
let input: PodNameInput = serde_json::from_str(input_json)
.map_err(|e| ToolError::InvalidArgument(format!("invalid InspectPod input: {e}")))?;
let detail = self
.discovery
.inspect(&input.name)
.await
.map_err(discovery_error_to_tool_error)?;
Ok(ToolOutput {
summary: format!("pod `{}` inspected", detail.pod_name),
content: Some(json_content(&detail)?),
})
}
}
struct AttachOrRestorePodTool<St> {
discovery: PodDiscovery<St>,
}
#[async_trait]
impl<St> Tool for AttachOrRestorePodTool<St>
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
let input: PodNameInput = serde_json::from_str(input_json).map_err(|e| {
ToolError::InvalidArgument(format!("invalid AttachOrRestorePod input: {e}"))
})?;
let result = self
.discovery
.attach_or_restore(&input.name)
.await
.map_err(discovery_error_to_tool_error)?;
let summary = match &result {
AttachRestoreResult::Attached { pod_name, .. } => {
format!("pod `{pod_name}` is live; attached to existing socket")
}
AttachRestoreResult::Restored { pod_name, .. } => {
format!("pod `{pod_name}` restored from pod state")
}
};
Ok(ToolOutput {
summary,
content: Some(json_content(&result)?),
})
}
}
pub fn list_visible_pods_tool<St>(discovery: PodDiscovery<St>) -> ToolDefinition
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
Arc::new(move || {
let meta = ToolMeta::new("ListVisiblePods")
.description(
"List Pod state entries visible to this Pod. This is state-backed and does not expose the host-wide Pod universe.",
)
.input_schema(serde_json::json!({
"type": "object",
"properties": {},
"additionalProperties": false,
}));
let tool: Arc<dyn Tool> = Arc::new(ListVisiblePodsTool {
discovery: discovery.clone(),
});
(meta, tool)
})
}
pub fn inspect_pod_tool<St>(discovery: PodDiscovery<St>) -> ToolDefinition
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
Arc::new(move || {
let meta = ToolMeta::new("InspectPod")
.description(
"Inspect one visible Pod by name from durable Pod state, distinguishing missing state from not-visible Pods.",
)
.input_schema(serde_json::to_value(schemars::schema_for!(PodNameInput)).unwrap());
let tool: Arc<dyn Tool> = Arc::new(InspectPodTool {
discovery: discovery.clone(),
});
(meta, tool)
})
}
pub fn attach_or_restore_pod_tool<St>(discovery: PodDiscovery<St>) -> ToolDefinition
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
Arc::new(move || {
let meta = ToolMeta::new("AttachOrRestorePod")
.description(
"Attach to a visible live Pod, or restore it from Pod state when no live socket is reachable. Missing state is an error.",
)
.input_schema(serde_json::to_value(schemars::schema_for!(PodNameInput)).unwrap());
let tool: Arc<dyn Tool> = Arc::new(AttachOrRestorePodTool {
discovery: discovery.clone(),
});
(meta, tool)
})
}
fn json_content<T: Serialize>(value: &T) -> Result<String, ToolError> {
serde_json::to_string_pretty(value)
.map_err(|e| ToolError::Internal(format!("serialize pod discovery output: {e}")))
}
fn discovery_error_to_tool_error(error: PodDiscoveryError) -> ToolError {
match error {
PodDiscoveryError::StateMissing { .. }
| PodDiscoveryError::NotVisible { .. }
| PodDiscoveryError::NotRestorable { .. } => ToolError::InvalidArgument(error.to_string()),
PodDiscoveryError::LockConflict { .. }
| PodDiscoveryError::Store(_)
| PodDiscoveryError::ScopeLock(_)
| PodDiscoveryError::RestoreSpawn(_)
| PodDiscoveryError::RestoreExited { .. }
| PodDiscoveryError::RestoreTimeout => ToolError::ExecutionFailed(error.to_string()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
use manifest::{Permission, ScopeRule};
use protocol::Greeting;
use protocol::stream::JsonLineWriter;
use session_store::{
FsStore, PodSpawnedChild, PodSpawnedScopeRule, new_segment_id, new_session_id,
};
use tempfile::TempDir;
use tokio::net::UnixListener;
use crate::runtime::dir::RuntimeDir;
static ENV_LOCK: Mutex<()> = Mutex::new(());
#[tokio::test(flavor = "current_thread")]
async fn state_backed_visibility_and_attach_restore_planning() {
let _env = ENV_LOCK.lock().unwrap();
let root = TempDir::new().unwrap();
let store_dir = root.path().join("store");
let runtime_base = root.path().join("runtime");
std::fs::create_dir_all(&runtime_base).unwrap();
unsafe {
std::env::set_var("INSOMNIA_RUNTIME_DIR", &runtime_base);
}
let store = FsStore::new(&store_dir).unwrap();
let session_id = new_session_id();
let active_child_segment = new_segment_id();
let pending_session_id = new_session_id();
let live_socket = runtime_base.join("child-live").join("sock");
std::fs::create_dir_all(live_socket.parent().unwrap()).unwrap();
let live_listener = spawn_snapshot_socket(&live_socket).await;
let stale_socket = runtime_base.join("child-stale").join("sock");
let pending_socket = runtime_base.join("child-pending").join("sock");
let parent = PodMetadata {
pod_name: "parent".into(),
active: None,
spawned_children: vec![
child("child-live", &live_socket),
child("child-stale", &stale_socket),
child("child-pending", &pending_socket),
],
};
store.write(&parent).unwrap();
store
.write(&PodMetadata {
pod_name: "child-live".into(),
active: Some(PodActiveSegmentRef::active_segment(
session_id,
active_child_segment,
)),
spawned_children: Vec::new(),
})
.unwrap();
store
.write(&PodMetadata {
pod_name: "child-stale".into(),
active: Some(PodActiveSegmentRef::active_segment(
session_id,
active_child_segment,
)),
spawned_children: Vec::new(),
})
.unwrap();
store
.write(&PodMetadata {
pod_name: "child-pending".into(),
active: Some(PodActiveSegmentRef::pending_segment(pending_session_id)),
spawned_children: Vec::new(),
})
.unwrap();
store
.write(&PodMetadata {
pod_name: "hidden".into(),
active: Some(PodActiveSegmentRef::active_segment(
session_id,
new_segment_id(),
)),
spawned_children: Vec::new(),
})
.unwrap();
// RuntimeDir creates parent runtime files; discovery must still use
// Pod state when spawned_pods.json is absent.
let runtime_dir = Arc::new(RuntimeDir::create(&runtime_base, "parent").await.unwrap());
let runtime_file = runtime_dir.path().join("spawned_pods.json");
assert!(!runtime_file.exists());
let registry = SpawnedPodRegistry::new(runtime_dir);
let discovery = PodDiscovery::new(
store.clone(),
"parent".into(),
runtime_base.clone(),
root.path().to_path_buf(),
registry,
);
let list = discovery.list_visible().await.unwrap();
let names: Vec<_> = list.iter().map(|p| p.pod_name.as_str()).collect();
assert_eq!(
names,
vec!["child-live", "child-pending", "child-stale", "parent"]
);
assert!(!names.contains(&"hidden"));
assert!(
list.iter()
.find(|p| p.pod_name == "child-live")
.unwrap()
.live
.reachable
);
assert!(
!list
.iter()
.find(|p| p.pod_name == "child-stale")
.unwrap()
.live
.reachable
);
let pending = list.iter().find(|p| p.pod_name == "child-pending").unwrap();
assert_eq!(pending.state, PodStateStatus::Readable);
assert_eq!(
pending.active.as_ref().unwrap().session_id,
pending_session_id
);
assert_eq!(pending.active.as_ref().unwrap().segment_id, None);
assert!(!pending.restore.possible);
let hidden_err = discovery.inspect("hidden").await.unwrap_err();
assert!(matches!(hidden_err, PodDiscoveryError::NotVisible { .. }));
let missing_err = discovery.inspect("missing").await.unwrap_err();
assert!(matches!(
missing_err,
PodDiscoveryError::StateMissing { .. }
));
let hidden_restore_err = discovery
.plan_attach_or_restore("hidden")
.await
.unwrap_err();
assert!(matches!(
hidden_restore_err,
PodDiscoveryError::NotVisible { .. }
));
let attach_plan = discovery
.plan_attach_or_restore("child-live")
.await
.unwrap();
assert!(matches!(attach_plan, AttachRestorePlan::Attach { .. }));
let restore_plan = discovery
.plan_attach_or_restore("child-stale")
.await
.unwrap();
assert!(matches!(restore_plan, AttachRestorePlan::Restore { .. }));
let lock_socket = runtime_base.join("lock-owner.sock");
let _guard = pod_registry::install_top_level(
"lock-owner".into(),
std::process::id(),
lock_socket.clone(),
vec![ScopeRule {
target: root.path().to_path_buf(),
permission: Permission::Read,
recursive: true,
}],
active_child_segment,
)
.unwrap();
let locked_err = discovery
.plan_attach_or_restore("child-stale")
.await
.unwrap_err();
assert!(matches!(locked_err, PodDiscoveryError::LockConflict { .. }));
live_listener.abort();
}
fn child(name: &str, socket_path: &Path) -> PodSpawnedChild {
PodSpawnedChild {
pod_name: name.to_string(),
socket_path: socket_path.to_path_buf(),
scope_delegated: vec![PodSpawnedScopeRule {
target: PathBuf::from("/tmp"),
permission: "read".into(),
recursive: true,
}],
callback_address: PathBuf::from("/tmp/parent.sock"),
}
}
async fn spawn_snapshot_socket(socket_path: &Path) -> tokio::task::JoinHandle<()> {
let _ = std::fs::remove_file(socket_path);
let listener = UnixListener::bind(socket_path).unwrap();
tokio::spawn(async move {
loop {
let Ok((stream, _)) = listener.accept().await else {
break;
};
tokio::spawn(async move {
let mut writer = JsonLineWriter::new(stream);
let _ = writer
.write(&Event::Snapshot {
entries: Vec::new(),
greeting: Greeting {
pod_name: "child-live".into(),
cwd: "/tmp".into(),
provider: "test".into(),
model: "test".into(),
scope_summary: String::new(),
tools: Vec::new(),
context_window: 0,
context_tokens: 0,
},
status: PodStatus::Idle,
})
.await;
});
}
})
}
}

View File

@ -1,5 +1,6 @@
pub mod compact;
pub mod controller;
pub mod discovery;
pub mod fs_view;
pub mod hook;
pub mod ipc;

View File

@ -53,6 +53,11 @@ struct Cli {
#[arg(long, value_name = "NAME", conflicts_with_all = ["session", "adopt"])]
pod: Option<String>,
/// Require `--pod` to restore existing Pod state instead of creating a
/// fresh Pod when no state exists. Used by Pod discovery restore flows.
#[arg(long, requires = "pod")]
require_pod_state: bool,
/// Restore a Pod from an existing session. The Pod re-uses the
/// given session id and appends new turns to the same jsonl;
/// concurrent writers are prevented by the pod-registry.
@ -269,6 +274,10 @@ async fn main() -> ExitCode {
}
}
}
Ok(None) if cli.require_pod_state => {
eprintln!("error: pod state missing for {pod_name}");
return ExitCode::FAILURE;
}
Ok(None) => match Pod::from_manifest(manifest, store, loader).await {
Ok(p) => p,
Err(e) => {

View File

@ -6,6 +6,7 @@ use arc_swap::ArcSwap;
use llm_worker::Item;
use llm_worker::llm_client::RequestConfig;
use llm_worker::llm_client::client::LlmClient;
use llm_worker::llm_client::types::Role;
use llm_worker::state::Mutable;
use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult};
use session_store::{
@ -121,6 +122,24 @@ impl SegmentState {
}
}
struct EmptyTurnRollbackSnapshot {
history_len: usize,
user_segments_len: usize,
entries_written: usize,
sink_len: usize,
pending_attachments: Vec<SystemItem>,
usage_history_len: usize,
ai_activity_count: usize,
last_run_interrupted: bool,
}
fn is_ai_materialized_item(item: &Item) -> bool {
match item {
Item::Message { role, .. } => *role == Role::Assistant,
Item::ToolCall { .. } | Item::ToolResult { .. } | Item::Reasoning { .. } => true,
}
}
/// Cheap-cloneable bundle of (store + shared session pointer + sink)
/// handed to the worker callback and the interceptor so they can
/// commit `LogEntry` values directly without going through an mpsc
@ -254,6 +273,12 @@ pub struct Pod<C: LlmClient, St: Store> {
/// notifications, events sent here are NOT replayed to clients that
/// connect after the fact — they are fire-and-forget broadcasts.
event_tx: Option<broadcast::Sender<Event>>,
/// Monotonic counter incremented by worker event bridges when an
/// assistant-side execution artifact becomes visible to clients before
/// it is necessarily committed to history (e.g. streaming text deltas).
/// `Pod::run` uses it to avoid rolling back a turn after the UI has
/// already observed AI output.
ai_activity_counter: Arc<AtomicUsize>,
/// Queue of pending `Method::Notify` notifications awaiting
/// injection into the next LLM request. Shared with the
/// PodInterceptor installed in `ensure_interceptor_installed`.
@ -392,6 +417,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
system_prompt_template: None,
alerter: self.alerter.clone(),
event_tx: self.event_tx.clone(),
ai_activity_counter: self.ai_activity_counter.clone(),
pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
scope_allocation: None,
@ -534,6 +560,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
system_prompt_template: None,
alerter: None,
event_tx: None,
ai_activity_counter: Arc::new(AtomicUsize::new(0)),
pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
scope_allocation: None,
@ -901,6 +928,12 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.event_tx = Some(event_tx);
}
/// Shared activity counter incremented by worker event bridges when any
/// assistant-side output is surfaced before history persistence.
pub fn ai_activity_counter(&self) -> Arc<AtomicUsize> {
self.ai_activity_counter.clone()
}
fn alert(&self, level: AlertLevel, source: AlertSource, message: String) {
if let Some(n) = self.alerter.as_ref() {
n.alert(level, source, message);
@ -1236,6 +1269,75 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
Ok(())
}
fn capture_empty_turn_rollback_snapshot(&self) -> EmptyTurnRollbackSnapshot {
let pending_attachments = self
.pending_attachments
.lock()
.expect("pending_attachments poisoned")
.clone();
let usage_history_len = self
.usage_history
.lock()
.expect("usage_history poisoned")
.len();
EmptyTurnRollbackSnapshot {
history_len: self.worker().history().len(),
user_segments_len: self.user_segments.len(),
entries_written: self.segment_state.entries_written(),
sink_len: self.sink.len(),
pending_attachments,
usage_history_len,
ai_activity_count: self.ai_activity_counter.load(Ordering::SeqCst),
last_run_interrupted: self.worker().last_run_interrupted(),
}
}
fn should_rollback_empty_turn(
&self,
result: &Result<WorkerResult, WorkerError>,
snapshot: &EmptyTurnRollbackSnapshot,
) -> bool {
if !matches!(result, Err(WorkerError::Cancelled)) {
return false;
}
if self.ai_activity_counter.load(Ordering::SeqCst) != snapshot.ai_activity_count {
return false;
}
!self.worker().history()[snapshot.history_len..]
.iter()
.any(is_ai_materialized_item)
}
fn rollback_empty_turn(
&mut self,
snapshot: EmptyTurnRollbackSnapshot,
) -> Result<(), StoreError> {
self.worker_mut()
.history_mut()
.truncate(snapshot.history_len);
self.worker_mut()
.set_last_run_interrupted(snapshot.last_run_interrupted);
self.user_segments.truncate(snapshot.user_segments_len);
*self
.pending_attachments
.lock()
.expect("pending_attachments poisoned") = snapshot.pending_attachments;
self.usage_history
.lock()
.expect("usage_history poisoned")
.truncate(snapshot.usage_history_len);
let _ = self.usage_tracker.drain();
let _ = self.metrics_tracker.drain();
let loc = self.segment_state.location();
self.store
.truncate(loc.session_id, loc.segment_id, snapshot.entries_written)?;
self.segment_state
.set_entries_written(snapshot.entries_written);
self.sink.truncate_silent(snapshot.sink_len);
Ok(())
}
/// Send user input and run until the LLM turn completes.
///
/// `input` is a typed segment list (see [`protocol::Segment`]). The
@ -1270,6 +1372,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.prepare_for_run().await?;
let rollback_snapshot = self.capture_empty_turn_rollback_snapshot();
// IDLE → active marker. Commits first so the next UserInput entry
// is contained inside this Invoke range. See `tickets/invoke-turn-llmcall-semantics.md`.
self.commit_entry(LogEntry::Invoke {
@ -1311,6 +1415,11 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let result = locked.run(flattened).await;
self.worker = Some(locked.unlock());
if self.should_rollback_empty_turn(&result, &rollback_snapshot) {
self.rollback_empty_turn(rollback_snapshot)?;
return Ok(PodRunResult::RolledBack);
}
self.handle_worker_result(result, history_before).await
}
@ -2847,6 +2956,7 @@ where
system_prompt_template: common.system_prompt_template,
alerter: None,
event_tx: None,
ai_activity_counter: Arc::new(AtomicUsize::new(0)),
pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
scope_allocation: Some(scope_allocation),
@ -2923,6 +3033,7 @@ where
system_prompt_template: common.system_prompt_template,
alerter: None,
event_tx: None,
ai_activity_counter: Arc::new(AtomicUsize::new(0)),
pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
scope_allocation: Some(scope_allocation),
@ -3098,6 +3209,7 @@ where
system_prompt_template: None,
alerter: None,
event_tx: None,
ai_activity_counter: Arc::new(AtomicUsize::new(0)),
pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
scope_allocation: Some(scope_allocation),
@ -3184,6 +3296,8 @@ pub enum PodRunResult {
Paused,
/// The worker reached its configured max_turns limit.
LimitReached,
/// The submit-time user turn was rolled back because no AI output was materialized.
RolledBack,
}
impl From<WorkerResult> for PodRunResult {

View File

@ -157,6 +157,16 @@ impl SegmentLogSink {
*mirror = entries;
}
/// Truncate the mirror without broadcasting.
pub fn truncate_silent(&self, entries_len: usize) {
let mut mirror = self
.inner
.mirror
.lock()
.expect("session log mirror mutex poisoned");
mirror.truncate(entries_len);
}
/// Atomically read the current mirror and subscribe to subsequent
/// commits. The returned snapshot and receiver split the entry
/// timeline into a duplicate-free, gap-free prefix/suffix pair.

View File

@ -129,7 +129,11 @@ impl SpawnedPodRegistry {
runtime_dir.write_spawned_pods(&records).await?;
let state_writer = pod_state_writer(store, pod_name.clone());
if pruned || metadata.is_some() {
// Runtime spawned-pod records are a live registry for ListPods and
// cursor/scope cleanup; durable Pod state remains the discovery source
// for later attach/restore, so do not delete unreachable children from
// Pod state just because their sockets are gone.
if metadata.is_none() || !pruned {
state_writer(&records)?;
}

View File

@ -1470,3 +1470,204 @@ async fn paused_then_run_closes_orphan_tool_use_for_next_request() {
"system note must precede new user message"
);
}
fn item_text_contains(item: &Item, needle: &str) -> bool {
item.as_text().unwrap_or_default().contains(needle)
}
async fn snapshot_contains_user_input(handle: &PodHandle, needle: &str) -> bool {
let stream = tokio::net::UnixStream::connect(handle.runtime_dir.socket_path())
.await
.unwrap();
let (reader, _writer) = stream.into_split();
let mut reader = protocol::stream::JsonLineReader::new(reader);
loop {
let event = reader.next::<Event>().await.unwrap().unwrap();
match event {
Event::Snapshot { entries, .. } => {
return entries.into_iter().any(|value| {
let entry: session_store::LogEntry =
serde_json::from_value(value).expect("LogEntry deserialise");
match entry {
session_store::LogEntry::UserInput { segments, .. } => {
protocol::Segment::flatten_to_text(&segments).contains(needle)
}
_ => false,
}
});
}
Event::Alert(_) => continue,
other => panic!("expected Snapshot first, got {other:?}"),
}
}
}
#[tokio::test]
async fn empty_turn_cancel_rolls_back_submit_entries_and_emits_signal() {
let client = MockClient::sequential(vec![MockResponse::Hang(vec![])]);
let pod = make_pod(client).await;
let handle = spawn_controller(pod).await;
let mut rx = handle.subscribe();
handle.send(Method::run_text("rollback me")).await.unwrap();
wait_for_status(&handle, PodStatus::Running).await;
handle.send(Method::Cancel).await.unwrap();
assert!(
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
e,
Event::RunEnd {
result: protocol::RunResult::RolledBack
}
))
.await,
"expected RunEnd::RolledBack after empty cancel"
);
wait_for_status(&handle, PodStatus::Idle).await;
let history = history_from_sink(&handle);
assert!(
!history
.iter()
.any(|item| item_text_contains(item, "rollback me")),
"rolled-back user input must not remain in history: {history:?}"
);
}
#[tokio::test]
async fn empty_turn_pause_rolls_back_and_snapshot_does_not_restore_input() {
let client = MockClient::sequential(vec![MockResponse::Hang(vec![])]);
let pod = make_pod(client).await;
let handle = spawn_controller(pod).await;
let mut rx = handle.subscribe();
handle
.send(Method::run_text("pause rollback"))
.await
.unwrap();
wait_for_status(&handle, PodStatus::Running).await;
handle.send(Method::Pause).await.unwrap();
assert!(
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
e,
Event::RunEnd {
result: protocol::RunResult::RolledBack
}
))
.await,
"expected RunEnd::RolledBack after empty pause"
);
wait_for_status(&handle, PodStatus::Idle).await;
assert!(
!snapshot_contains_user_input(&handle, "pause rollback").await,
"attach snapshot must not resurrect rolled-back empty turn input"
);
}
#[tokio::test]
async fn empty_turn_rollback_removes_only_the_most_recent_turn() {
let client = MockClient::sequential(vec![
MockResponse::Complete(simple_text_events()),
MockResponse::Hang(vec![]),
]);
let pod = make_pod(client).await;
let handle = spawn_controller(pod).await;
let mut rx = handle.subscribe();
handle.send(Method::run_text("first kept")).await.unwrap();
assert!(
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
e,
Event::RunEnd {
result: protocol::RunResult::Finished
}
))
.await,
"expected first run to finish"
);
wait_for_status(&handle, PodStatus::Idle).await;
handle
.send(Method::run_text("second rolled back"))
.await
.unwrap();
wait_for_status(&handle, PodStatus::Running).await;
handle.send(Method::Cancel).await.unwrap();
assert!(
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
e,
Event::RunEnd {
result: protocol::RunResult::RolledBack
}
))
.await,
"expected empty second run to roll back"
);
let history = history_from_sink(&handle);
assert!(
history
.iter()
.any(|item| item_text_contains(item, "first kept"))
);
assert!(
history
.iter()
.any(|item| item_text_contains(item, "Hello World"))
);
assert!(
!history
.iter()
.any(|item| item_text_contains(item, "second rolled back")),
"rollback must affect only the most recent empty turn: {history:?}"
);
}
#[tokio::test]
async fn pause_after_assistant_token_does_not_rollback() {
let client = MockClient::sequential(vec![MockResponse::Hang(vec![
LlmEvent::text_block_start(0),
LlmEvent::text_delta(0, "committed before pause"),
LlmEvent::text_block_stop(0, None),
])]);
let pod = make_pod(client).await;
let handle = spawn_controller(pod).await;
let mut rx = handle.subscribe();
handle
.send(Method::run_text("keep this turn"))
.await
.unwrap();
assert!(
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
e,
Event::TextDone { .. }
))
.await,
"assistant token should be visible before pause"
);
handle.send(Method::Pause).await.unwrap();
assert!(
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
e,
Event::RunEnd {
result: protocol::RunResult::Paused
}
))
.await,
"pause after assistant output must keep the existing Paused path"
);
wait_for_status(&handle, PodStatus::Paused).await;
let history = history_from_sink(&handle);
assert!(
history
.iter()
.any(|item| item_text_contains(item, "keep this turn")),
"token-visible turn must keep its UserInput entry: {history:?}"
);
}

View File

@ -579,7 +579,7 @@ async fn restored_registry_uses_pod_state_without_runtime_file() {
}
#[tokio::test]
async fn load_from_pod_state_prunes_children_with_missing_sockets() {
async fn load_from_pod_state_prunes_runtime_children_but_preserves_durable_state() {
let runtime_tmp = TempDir::new().unwrap();
let store_tmp = TempDir::new().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap();
@ -615,12 +615,23 @@ async fn load_from_pod_state_prunes_children_with_missing_sockets() {
.read_by_name("spawner")
.unwrap()
.expect("spawner metadata should be written");
assert_eq!(metadata.spawned_children.len(), 1);
assert_eq!(metadata.spawned_children[0].pod_name, "alive");
assert_eq!(metadata.spawned_children.len(), 2);
assert!(
metadata
.spawned_children
.iter()
.any(|c| c.pod_name == "alive")
);
assert!(
metadata
.spawned_children
.iter()
.any(|c| c.pod_name == "missing")
);
}
#[tokio::test]
async fn load_from_pod_state_reclaims_pruned_child_scope_and_registry_deny() {
async fn load_from_pod_state_reclaims_pruned_child_scope_without_deleting_pod_state() {
let _env = EnvGuard::acquire();
let runtime_tmp = TempDir::new().unwrap();
let store_tmp = TempDir::new().unwrap();
@ -705,7 +716,8 @@ async fn load_from_pod_state_reclaims_pruned_child_scope_and_registry_deny() {
.read_by_name("spawner")
.unwrap()
.expect("spawner metadata should remain");
assert!(metadata.spawned_children.is_empty());
assert_eq!(metadata.spawned_children.len(), 1);
assert_eq!(metadata.spawned_children[0].pod_name, "missing");
let runtime_contents = std::fs::read_to_string(rd.path().join("spawned_pods.json")).unwrap();
let runtime_records: Vec<SpawnedPodRecord> = serde_json::from_str(&runtime_contents).unwrap();
assert!(runtime_records.is_empty());

View File

@ -43,6 +43,18 @@ pub enum Method {
kind: CompletionKind,
prefix: String,
},
/// List Pods visible to this Pod from durable Pod state. This is not a
/// host-wide Pod universe query.
ListVisiblePods,
/// Inspect one Pod by name if its state exists and it is visible to this Pod.
InspectPod {
name: String,
},
/// Attach to a visible live Pod, or restore it from durable Pod state when
/// it is not live. Missing state and not-visible state are distinct errors.
AttachOrRestorePod {
name: String,
},
}
/// Typed lifecycle events sent from a child Pod to its parent.
@ -393,6 +405,20 @@ pub enum Event {
kind: CompletionKind,
entries: Vec<CompletionEntry>,
},
/// Reply to `Method::ListVisiblePods`. Payload is a stable JSON value so
/// the Pod crate can evolve discovery fields without introducing a protocol
/// dependency on session-store.
VisiblePods {
pods: serde_json::Value,
},
/// Reply to `Method::InspectPod`.
PodInspection {
pod: serde_json::Value,
},
/// Reply to `Method::AttachOrRestorePod`.
PodAttachRestore {
result: serde_json::Value,
},
Alert(Alert),
/// Pod has started compacting the current session.
///
@ -545,6 +571,11 @@ pub enum RunResult {
Finished,
Paused,
LimitReached,
/// The accepted Method::Run produced no assistant/tool output before
/// user interruption, so the Pod rolled the submit-time turn state back
/// to its pre-submit snapshot. Clients should treat the Pod as Idle and
/// restore the just-submitted input into the editable composer if desired.
RolledBack,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@ -1215,4 +1246,59 @@ mod tests {
other => panic!("expected UserMessage, got {other:?}"),
}
}
#[test]
fn pod_discovery_methods_roundtrip() {
let methods = [
Method::ListVisiblePods,
Method::InspectPod {
name: "child".into(),
},
Method::AttachOrRestorePod {
name: "child".into(),
},
];
for method in methods {
let json = serde_json::to_string(&method).unwrap();
let decoded: Method = serde_json::from_str(&json).unwrap();
match (decoded, method) {
(Method::ListVisiblePods, Method::ListVisiblePods)
| (Method::InspectPod { .. }, Method::InspectPod { .. })
| (Method::AttachOrRestorePod { .. }, Method::AttachOrRestorePod { .. }) => {}
(decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"),
}
}
}
#[test]
fn pod_discovery_events_roundtrip() {
let events = [
Event::VisiblePods {
pods: serde_json::json!([{ "pod_name": "child" }]),
},
Event::PodInspection {
pod: serde_json::json!({ "pod_name": "child" }),
},
Event::PodAttachRestore {
result: serde_json::json!({ "action": "attach" }),
},
];
for event in events {
let json = serde_json::to_string(&event).unwrap();
let decoded: Event = serde_json::from_str(&json).unwrap();
match (decoded, event) {
(Event::VisiblePods { pods }, Event::VisiblePods { pods: expected }) => {
assert_eq!(pods, expected)
}
(Event::PodInspection { pod }, Event::PodInspection { pod: expected }) => {
assert_eq!(pod, expected)
}
(
Event::PodAttachRestore { result },
Event::PodAttachRestore { result: expected },
) => assert_eq!(result, expected),
(decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"),
}
}
}
}

View File

@ -123,6 +123,35 @@ impl PodMetadataStore for FsStore {
Ok(Some(serde_json::from_str(&content)?))
}
fn list_names(&self) -> Result<Vec<String>, StoreError> {
let dir = self.pods_dir();
let mut names = Vec::new();
if !dir.exists() {
return Ok(names);
}
for entry in fs::read_dir(dir)? {
let entry = entry?;
if !entry.file_type()?.is_dir() {
continue;
}
if !entry.path().join("metadata.json").exists() {
continue;
}
let Some(name) = entry.file_name().to_str().map(ToOwned::to_owned) else {
continue;
};
if validate_pod_name(&name).is_ok() {
names.push(name);
}
}
names.sort();
Ok(names)
}
fn root_dir(&self) -> Option<PathBuf> {
Some(self.root.clone())
}
fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError> {
let path = self.pod_metadata_path(pod_name)?;
match fs::remove_file(&path) {

View File

@ -92,6 +92,16 @@ pub trait PodMetadataStore: Send + Sync {
/// Read metadata by Pod name. Returns `None` when no metadata exists.
fn read_by_name(&self, pod_name: &str) -> Result<Option<PodMetadata>, StoreError>;
/// List persisted Pod metadata keys. Implementations return names only;
/// callers can then read each item independently so a corrupt metadata
/// file does not make the whole discovery result fail.
fn list_names(&self) -> Result<Vec<String>, StoreError>;
/// Return the metadata root directory when this backend is path-backed.
fn root_dir(&self) -> Option<PathBuf> {
None
}
/// Delete metadata by Pod name. Missing metadata is a successful no-op.
fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError>;
}

View File

@ -82,6 +82,33 @@ pub trait Store: Send + Sync {
/// Check if a segment exists.
fn exists(&self, session_id: SessionId, segment_id: SegmentId) -> Result<bool, StoreError>;
/// Truncate a segment log to `entries_len` entries.
///
/// Used by Pod's submit-time empty-turn rollback after it has proven
/// that no LLM output from the accepted turn was materialized. The
/// default implementation rewrites the retained prefix through
/// `create_segment`, matching the append-only logical model while still
/// allowing concrete stores to provide a more direct truncate.
fn truncate(
&self,
session_id: SessionId,
segment_id: SegmentId,
entries_len: usize,
) -> Result<(), StoreError> {
let mut entries = self.read_all(session_id, segment_id)?;
if entries_len > entries.len() {
return Err(StoreError::Corrupt {
line: entries_len,
message: format!(
"cannot truncate segment {segment_id} to {entries_len} entries; only {} entries stored",
entries.len()
),
});
}
entries.truncate(entries_len);
self.create_segment(session_id, segment_id, &entries)
}
/// Count entries currently stored for a segment.
///
/// Used by `ensure_head_or_fork` to detect concurrent writers:

View File

@ -250,6 +250,7 @@ fn pod_metadata_minimal_crud() {
let pending = PodMetadata::new(pod_name, Some(PodActiveSegmentRef::pending_segment(sid)));
store.write(&pending).unwrap();
assert_eq!(store.list_names().unwrap(), vec![pod_name.to_string()]);
assert_eq!(store.read_by_name(pod_name).unwrap(), Some(pending.clone()));
assert!(
dir.path()

View File

@ -40,6 +40,13 @@ impl CompletionState {
pub const MAX_VISIBLE: usize = 6;
}
struct RollbackSubmitState {
text: String,
segments: Vec<Segment>,
block_start: usize,
turn_before: usize,
}
pub struct App {
pub pod_name: String,
pub connected: bool,
@ -91,6 +98,12 @@ pub struct App {
/// Top entry index of the task pane's visible window. Clamped on
/// render so it never points past the end of the list.
pub task_pane_scroll: usize,
/// Local submit state kept until the accepted run either completes
/// normally or reports that the empty assistant turn was rolled back.
pending_submit_rollback: Option<RollbackSubmitState>,
/// Last rolled-back submit that could not be restored because the
/// composer already contained unsent user input.
last_rolled_back_input: Option<Vec<Segment>>,
}
impl App {
@ -120,6 +133,8 @@ impl App {
task_store: TaskStore::new(),
task_pane_open: false,
task_pane_scroll: 0,
pending_submit_rollback: None,
last_rolled_back_input: None,
}
}
@ -339,7 +354,15 @@ impl App {
// TurnHeader / UserMessage blocks are pushed in response to
// `Event::UserMessage` (single source of truth, shared by every
// client subscribed to the Pod). Locally we only clear the
// input buffer and forward the method.
// input buffer and forward the method, while remembering enough
// local state to undo the visible submit if the Pod reports that
// the accepted run produced no assistant output and was rolled back.
self.pending_submit_rollback = Some(RollbackSubmitState {
text: Segment::flatten_to_text(&segments),
segments: segments.clone(),
block_start: self.blocks.len(),
turn_before: self.turn_index,
});
self.input.clear();
Some(Method::Run { input: segments })
}
@ -670,20 +693,22 @@ impl App {
self.push_error(format!("[{code:?}] {message}"));
}
Event::RunEnd { result } => {
self.blocks.push(Block::TurnStats {
requests: self.run_requests,
upload_tokens: self.run_upload_tokens,
output_tokens: self.run_output_tokens,
});
self.set_pod_status(match result {
RunResult::Paused => PodStatus::Paused,
RunResult::Finished | RunResult::LimitReached => PodStatus::Idle,
});
self.run_requests = 0;
self.run_upload_tokens = 0;
self.run_output_tokens = 0;
self.current_tool = None;
self.assistant_streaming = false;
if matches!(result, RunResult::RolledBack) {
self.handle_rolled_back_run();
} else {
self.blocks.push(Block::TurnStats {
requests: self.run_requests,
upload_tokens: self.run_upload_tokens,
output_tokens: self.run_output_tokens,
});
self.pending_submit_rollback = None;
self.reset_run_state(match result {
RunResult::Paused => PodStatus::Paused,
RunResult::Finished | RunResult::LimitReached | RunResult::RolledBack => {
PodStatus::Idle
}
});
}
}
Event::CompactStart => {
self.blocks.push(Block::Compact(CompactEvent::Streaming {
@ -758,6 +783,9 @@ impl App {
state.selected = 0;
}
}
Event::VisiblePods { .. }
| Event::PodInspection { .. }
| Event::PodAttachRestore { .. } => {}
Event::Shutdown => {
self.mark_orphan_compacts_incomplete();
self.quit = true;
@ -765,6 +793,44 @@ impl App {
}
}
fn reset_run_state(&mut self, status: PodStatus) {
self.set_pod_status(status);
self.run_requests = 0;
self.run_upload_tokens = 0;
self.run_output_tokens = 0;
self.current_tool = None;
self.assistant_streaming = false;
}
fn handle_rolled_back_run(&mut self) {
let hint = if let Some(state) = self.pending_submit_rollback.take() {
self.blocks
.truncate(state.block_start.min(self.blocks.len()));
self.turn_index = state.turn_before;
if self.input.is_empty() {
self.input.replace_with_segments(&state.segments);
self.completion = None;
self.last_rolled_back_input = None;
"Rolled back empty assistant turn; restored your input.".to_owned()
} else {
let preview = rollback_input_preview(&state.text);
self.last_rolled_back_input = Some(state.segments);
format!(
"Rolled back empty assistant turn; composer was not empty, kept submitted input in backup: {preview}"
)
}
} else {
"Rolled back empty assistant turn; no local submitted input was available to restore."
.to_owned()
};
self.reset_run_state(PodStatus::Idle);
self.blocks.push(Block::Alert {
level: AlertLevel::Warn,
source: AlertSource::Pod,
message: hint,
});
}
fn append_assistant_text(&mut self, text: &str) {
if self.assistant_streaming {
if let Some(Block::AssistantText { text: existing }) = self.blocks.last_mut() {
@ -1093,6 +1159,16 @@ fn strip_cat_n_prefix(formatted: &str) -> String {
out
}
fn rollback_input_preview(text: &str) -> String {
const MAX_CHARS: usize = 80;
let mut one_line = text.replace('\n', "");
if one_line.chars().count() > MAX_CHARS {
one_line = one_line.chars().take(MAX_CHARS).collect::<String>();
one_line.push('…');
}
one_line
}
/// True if the submitted segment list carries no user-visible content
/// (only whitespace / newlines, no paste, no typed atoms). Used to
/// decide whether an empty Enter should be a no-op or trigger a
@ -1434,6 +1510,133 @@ mod completion_flow_tests {
assert!(app.completion.as_ref().unwrap().entries.is_empty());
}
#[test]
fn rolled_back_run_restores_input_and_removes_submit_blocks() {
let mut app = App::new("test".into());
let submitted = submit_text(&mut app, "please wait");
assert_eq!(input_text(&app), "");
app.handle_pod_event(Event::UserMessage {
segments: submitted,
});
// Simulate run-derived attachment display after the submitted user line.
app.blocks.push(Block::SystemMessage {
text: "[File: README.md]".into(),
});
app.handle_pod_event(Event::TurnStart { turn: 1 });
app.handle_pod_event(Event::Usage {
input_tokens: Some(100),
output_tokens: Some(0),
cache_read_input_tokens: Some(40),
});
app.handle_pod_event(Event::RunEnd {
result: RunResult::RolledBack,
});
assert_eq!(input_text(&app), "please wait");
assert_eq!(app.turn_index, 0);
assert!(app.blocks.iter().all(|b| !matches!(
b,
Block::TurnHeader { .. }
| Block::UserMessage { .. }
| Block::SystemMessage { .. }
| Block::TurnStats { .. }
)));
assert!(warning_contains(&app, "restored your input"));
assert!(matches!(app.pod_status, PodStatus::Idle));
assert!(!app.running);
assert!(!app.paused);
assert_eq!(app.run_requests, 0);
assert_eq!(app.run_upload_tokens, 0);
assert_eq!(app.run_output_tokens, 0);
assert!(app.current_tool.is_none());
}
#[test]
fn rolled_back_run_does_not_overwrite_existing_unsent_input() {
let mut app = App::new("test".into());
let submitted = submit_text(&mut app, "original submit");
app.handle_pod_event(Event::UserMessage {
segments: submitted,
});
for c in "draft while running".chars() {
app.insert_char(c);
}
app.handle_pod_event(Event::RunEnd {
result: RunResult::RolledBack,
});
assert_eq!(input_text(&app), "draft while running");
assert_eq!(
Segment::flatten_to_text(app.last_rolled_back_input.as_ref().unwrap()),
"original submit"
);
assert!(warning_contains(&app, "composer was not empty"));
assert!(app.blocks.iter().all(|b| !matches!(
b,
Block::TurnHeader { .. } | Block::UserMessage { .. } | Block::TurnStats { .. }
)));
}
#[test]
fn non_rolled_back_run_end_keeps_submitted_blocks_and_does_not_restore_input() {
for result in [RunResult::Paused, RunResult::Finished] {
let mut app = App::new("test".into());
let submitted = submit_text(&mut app, "normal run");
app.handle_pod_event(Event::UserMessage {
segments: submitted,
});
app.handle_pod_event(Event::RunEnd { result });
assert_eq!(input_text(&app), "");
assert!(
app.blocks
.iter()
.any(|b| matches!(b, Block::TurnHeader { .. }))
);
assert!(
app.blocks
.iter()
.any(|b| matches!(b, Block::UserMessage { .. }))
);
assert!(
app.blocks
.iter()
.any(|b| matches!(b, Block::TurnStats { .. }))
);
assert!(!warning_contains(&app, "Rolled back empty assistant turn"));
assert!(app.last_rolled_back_input.is_none());
}
}
fn submit_text(app: &mut App, text: &str) -> Vec<Segment> {
for c in text.chars() {
app.insert_char(c);
}
match app.submit_input() {
Some(Method::Run { input }) => input,
other => panic!("expected Run, got {other:?}"),
}
}
fn input_text(app: &App) -> String {
Segment::flatten_to_text(&app.input.submit_segments())
}
fn warning_contains(app: &App, needle: &str) -> bool {
app.blocks.iter().any(|block| {
matches!(
block,
Block::Alert {
level: AlertLevel::Warn,
message,
..
} if message.contains(needle)
)
})
}
#[test]
fn snapshot_renders_system_message_block_from_session_start() {
let mut app = App::new("test".into());

View File

@ -168,6 +168,56 @@ impl InputBuffer {
self.cursor = 0;
}
pub fn is_empty(&self) -> bool {
self.atoms.is_empty()
}
/// Replace the whole composer with protocol segments previously emitted
/// by [`submit_segments`](Self::submit_segments), preserving typed chips
/// and placing the cursor at the end of the restored input.
pub fn replace_with_segments(&mut self, segments: &[protocol::Segment]) {
self.atoms.clear();
for segment in segments {
match segment {
protocol::Segment::Text { content } => {
self.atoms.extend(content.chars().map(Atom::Char));
}
protocol::Segment::Paste {
id,
chars,
lines,
content,
} => {
self.next_paste_id = self.next_paste_id.max(id.saturating_add(1).max(1));
self.atoms.push(Atom::Paste(PasteRef {
id: *id,
chars: *chars as usize,
lines: *lines as usize,
content: content.clone(),
}));
}
protocol::Segment::FileRef { path } => {
self.atoms
.push(Atom::FileRef(FileRefAtom { path: path.clone() }));
}
protocol::Segment::KnowledgeRef { slug } => {
self.atoms
.push(Atom::KnowledgeRef(KnowledgeRefAtom { slug: slug.clone() }));
}
protocol::Segment::WorkflowInvoke { slug } => {
self.atoms.push(Atom::WorkflowInvoke(WorkflowInvokeAtom {
slug: slug.clone(),
}));
}
protocol::Segment::Unknown => {
self.atoms
.extend("[unknown input segment]".chars().map(Atom::Char));
}
}
}
self.cursor = self.atoms.len();
}
pub fn insert_char(&mut self, c: char) {
self.atoms.insert(self.cursor, Atom::Char(c));
self.cursor += 1;

View File

@ -0,0 +1,55 @@
# Pod/TUI: 手動 rollback 導線
## 背景
`pod-empty-turn-rollback` / `tui-empty-turn-restore` により、AI 側出力が 0 の interrupted turn については Pod 側で自動 rollback し、TUI 側で入力を復元できるようになった。
一方で、rollback substrate は直前 Run の状態復元に使える形で入り始めているが、ユーザーが明示的に rollback を要求する導線はまだない。誤送信、モデル選択ミス、途中で方針を変えた場合などに、ユーザーが手動で直前状態へ戻す手段が必要になる可能性がある。
詳細な UX / rollback 対象範囲 / safety policy は未決定のため、本チケットでは要求を保持し、実装方針は着手時に確定する。
## 要件メモ
- ユーザーが明示的に rollback を要求できる導線を用意する。
- TUI system command / keybinding / tool / protocol Method のどこに置くかは未決定。
- 最初は TUI から直前 turn を rollback する導線が候補。
- rollback 対象範囲を決める。
- 直前 submit のみか。
- assistant output がある turn を許可するか。
- tool call / tool result が含まれる turn を許可するか。
- 複数 turn rollback は `pod-session-fork` との関係を確認する。
- safety policy を決める。
- user-visible assistant output を消す場合は確認を要求するか。
- tool side effect が既に発生した turn を rollback できるのか、履歴から消すのではなく fork に誘導するのか。
- rollback が history/context 永続化原則を壊さないようにする。
- TUI 側の表示を決める。
- rollback 成功 / 失敗の通知。
- 消された blocks の扱い。
- rollback された input を composer に戻すか、history/backup に置くか。
- protocol signal を整理する。
- 既存 `RunResult::RolledBack` を再利用できるか。
- 手動 rollback は RunEnd ではなく専用 Event / Method が必要か。
## 完了条件(詳細未確定)
- 手動操作で rollback を要求できる。
- rollback 成功時、Pod の session log / SegmentLogSink mirror / TUI 表示が整合する。
- rollback 失敗時、理由がユーザーに見える。
- tool side effect や assistant output を含む turn の扱いが仕様として明示されている。
- tests がある。
- `cargo fmt --check`
- `cargo check --workspace`
- 関連 crate の tests。
## 範囲外
- 複数ターン rollback / 過去地点からの本格的なやり直し(`pod-session-fork` と調整)
- rollback 履歴スタック
- tool side effect の undo
- fork tree 可視化
## 関連
- `tickets/pod-session-fork.md`
- 完了済み: `pod-empty-turn-rollback`
- 完了済み: `tui-empty-turn-restore`

View File

@ -1,78 +0,0 @@
# Pod: 過去 Pod の探索と restore ツール
## 背景
Pod state の永続化と `--pod <name>` resume が入ったことで、名前が分かっている Pod は復元できるようになった。一方で、AI / operator が「過去にどんな Pod があったか」「この名前の Pod は復元できるか」「live attach できるのか、restore が必要なのか」を機械的に調べる導線はまだない。
現在の `ListPods` は主に spawner が知っている spawned child の live/runtime registry を見るためのツールであり、永続化された全 Pod の探索や、過去 Pod の restore 導線としては不十分。今後 Pod 単位で作業を再開する運用を成立させるには、Pod state を正本として過去 Pod を列挙・確認・復元できる tool surface が必要。
ただし、ホスト上の全 Pod state がどの Pod / LLM からも見える設計にはしない。Pod 管理 tool は capability / visibility scope を持ち、呼び出し元が見る権限を持つ Pod だけを列挙・操作できる必要がある。
## 要件
- 永続化された Pod state から、可視性 scope 内の既知 Pod 一覧を取得する tool / protocol API を追加する。
- 実際の Method / tool 名は実装時に確定する。
- `session-store` の Pod state backend/FsStore を正本にし、runtime dir の `spawned_pods.json` を正本にしない。
- state が壊れている Pod や active segment 未確定の Pod は、全体失敗ではなく item 単位の状態として返せるようにする。
- 呼び出し元に可視性がない Pod は列挙結果に含めない。
- Pod 可視性の制御を設計する。
- 少なくとも「現在の parent が spawn した child」と「明示的に指定された Pod 名」は扱えるようにする。
- ホスト上の全 Pod を無条件に返す admin/global tool にはしない。
- visibility の根拠は Pod state / parent-child registry / manifest capability / explicit user selection のいずれかに寄せ、実装時に確定する。
- 可視でない Pod に対する detail / restore / attach は not visible / forbidden として、state missing とは区別する。
- 一覧 item には最低限以下を含める。
- `pod_name`
- active `SessionId` / `SegmentId`(未確定ならその状態)
- live socket / runtime が到達可能かどうか
- restore 可能かどうかと、restore に必要な名前
- spawned children が永続化されている場合は、その概要(件数や reachable 状態。詳細展開は別 API でもよい)
- Pod 名指定で詳細を取得できる API を用意する。
- active pointer
- restoreability
- live attach 可能性
- spawned child registry の概要
- 読めない state / 消えた socket / lock 衝突を区別したエラー
- Pod 名指定で restore / attach を開始できる tool 導線を用意する。
- live socket が到達可能なら attach 相当の扱いにする。
- 到達不能だが Pod state があるなら既存の `--pod <name>` / `Pod::restore_from_pod_metadata(...)` 経路で restore する。
- Pod state が存在しない名前を指定した場合に新規 Pod を作るか、明示エラーにするかは API ごとに曖昧にせず決める。探索・復元ツールとしては、意図しない新規作成を避けるため default はエラー寄りが望ましい。
- 既存の `--pod <name>` / `--session <UUID>` / spawned child 向け `ListPods` / `SendToPod` / `StopPod` と責務を混ぜない。
- `ListPods` は現在接続中の spawned child registry を見る用途として維持してよい。
- 過去 Pod の探索 API は Pod state を正本にする。
- live writer 二重起動防止、scope delegation、session lock の責務は既存 registry / lock に任せ、Pod state に lock 責務を追加しない。
- tool result として LLM に返す情報は通常の tool call 履歴に残る形にし、history に残らない context 差し込みで実現しない。
## 完了条件
- 永続化済み Pod のうち、呼び出し元の可視性 scope 内にあるものだけを Pod state から列挙できる。
- runtime dir の `spawned_pods.json` が存在しない状態でも、Pod state から可視 Pod を探索できる。
- Pod 名指定で詳細を取得し、live attach 可能 / restore 可能 / state 不在 / state 破損 / lock 衝突を区別できる。
- Pod 名指定の restore / attach tool が、到達可能 live Pod には attach し、到達不能だが state がある Pod には既存 restore 経路で復元できる。
- 既存の `ListPods` / `SendToPod` / `StopPod` / `--pod` / `--session` の挙動を壊さない。
- unit / integration test で以下を確認する。
- 複数 Pod metadata の列挙(可視 Pod のみ)
- 可視でない Pod が列挙されず、detail / restore / attach でも state missing と区別されること
- active segment 未確定 Pod の表示
- runtime file が消えても Pod state から探索できる
- socket 到達可否の反映
- restore / attach の分岐
- lock 衝突時に二重 writer を起動しない
- `cargo fmt --check`
- `cargo check --workspace`
- 関連 crate の tests少なくとも `cargo test -p pod`。tool surface を置く crate に応じて追加)
## 範囲外
- 過去 Pod 一覧の本格 UI / pickerTUI 側の Pod picker は `tickets/tui-pod-restore-picker.md` で扱う)
- fork tree の可視化
- transcript 全文検索 / semantic search
- Pod の自動再起動
- 古い Pod state の GC / retention policy
- session / segment 単位の新しい resume 引数
## 依存 / 関連
- Pod state backend / FsStore 実装
- Pod lifecycle write-through
- Pod 名単位の resume / attach 導線
- SpawnedPodRegistry の永続化と復元

View File

@ -1,43 +0,0 @@
# Pod: 空応答ターンの自動巻き戻し
## 背景
`Method::Run` でユーザーが Submit したあと、AI 応答が一切 history に積まれないまま Pause / Cancel されると、現状の Pod は以下の状態を確定させる:
- `worker.history` に user message と attachment 由来の system message が残る
- `pod.user_segments` に当該入力が push 済み
- `session_store` 側にも `save_user_input` の delta が commit されており、続いて `save_delta`(実体は空 or attachments のみ)/`save_turn_end`/`save_run_completed(interrupted=true)` が積まれて head_hash が前進する
結果として「ユーザー発話だけがあり、AI 応答ゼロ」のターンが履歴に commit され、次の Run はその上に積み増される。Submit を取り消して書き直す・別の話に切り替える、といった操作のたびにノイズが残ってしまう。
人間操作の TUI ではこのケースが頻発するので、Submit 前の状態に丸ごと巻き戻す仕組みがほしい。
## 要件
- Pod controller のターン処理で、`Method::Run` 起点のターンが以下を **両方** 満たす場合は Submit 直前の状態に巻き戻す:
- 終了が Pause / Cancel 由来である(`Method::Pause` / `Method::Cancel` を受けて `WorkerError::Cancelled` で抜けた経路)
- そのターンで `worker.history` に LLM 応答に由来する entryassistant message / tool_use / tool_result**一つも** append されていない
- 巻き戻しは Submit 起点で生まれた変更を全て取り消す:
- `worker.history` を Submit 前の長さに truncate
- `pod.user_segments` から push した分を pop
- `pending_attachments` を破棄
- `session_store` の head_hash を Submit 直前まで戻し、`save_user_input` / `save_delta` / `save_turn_end` / `save_run_completed` の commit を相殺
- `runtime_dir.write_history` / `write_status` で永続化済みの `history.json` / `status` を同期
- 巻き戻し成立時の最終 status は **Idle**Resume すべき AI ターンは存在しないため Paused にしない)
- 巻き戻しの有無はクライアントが判別できるようイベントで伝える(`Event::RunEnd` の variant 拡張、または専用イベント)。これにより TUI 側で「巻き戻されたので入力欄に戻した」等のフィードバックが組める。
- 対象は `Method::Run` 起点のターンのみ。Notify 起点の自動 Run`run_for_notification`)と `Method::Resume` は対象外。
- Mid-run compaction 後の resume で LLM 応答ゼロになるケース(`do_compact_and_resume` 経路は、Submit 前の history snapshot が依然有効である限り同様に巻き戻せる設計とする。
## 完了条件
- TUI / pod_cli いずれの経路でも、`Method::Run` → AI 応答ゼロで Pause / Cancel すると Pod の in-memory 状態(`worker.history`, `user_segments`, `pending_attachments`, statusが Submit 前と一致する
- 同条件で session_store / `history.json` / `status` の永続側も Submit 前と一致する
- AI 応答が 1 件でも積まれていたターンは従来通り、巻き戻さずに Paused / Idle で確定する
- クライアントが受け取るイベントから巻き戻しの有無が分かる
- ユニットテストで「assistant 応答ゼロでの Cancel」「assistant 応答ゼロでの Pause」「tool_use 1 回のあとの Cancel巻き戻さない」「Notify 起点の Cancel対象外」の 4 ケースを最低限カバーする
## 範囲外
- 複数ターンに遡る undo`tickets/pod-session-fork.md` で Fork として実装する計画)
- ユーザーの明示操作で「このターンを巻き戻す」を選ばせる UI自動条件のみ
- TUI 側で「巻き戻された入力を編集領域に復元する」等の UX`tickets/tui-empty-turn-restore.md`

View File

@ -1,35 +0,0 @@
# TUI: 巻き戻された入力の編集領域への復元
## 背景
`tickets/pod-empty-turn-rollback.md` で Pod 側に「Submit 後 AI 応答ゼロで Pause/Cancel した場合に Submit 前の状態へ自動巻き戻す」仕組みを入れる。Pod は巻き戻しの有無を `Event::RunEnd`(または専用イベント)で TUI に通知する。
TUI 側は現状 `submit_input` 時点で `input.clear()` を呼んでおり、Submit 後はテキストが失われる。Pod が巻き戻したのに TUI 側だけ「空入力欄 + ターンヘッダーが画面に残った状態」では UX が破綻する。Pod の巻き戻しに合わせて、TUI も Submit 前と同等の見た目に戻す。
## 要件
- Pod から巻き戻し通知を受け取った場合:
- 直近 Submit でレンダリングしたブロック(`Block::TurnHeader`, `Block::UserMessage`, それに付随する `[File: ...]` 等の attachment ブロック、`Block::TurnStats` 等の Run 由来ブロック全般)を画面から取り除く
- 入力欄に当該 Submit のテキストtyped segmentsを復元し、カーソル位置はテキスト末尾
- `running` / `paused` / `assistant_streaming` / `current_tool` / `run_*_tokens` 等のターン状態は Idle 相当にリセット
- 復元は 1 ターン分のみ(複数巻き戻しは Pod 側でも対象外)
- 入力欄に未送信の文字Run 中にユーザーが書き始めたもの、tui-input-queue が入ったらキュー内容)が既にある場合の振る舞いを決める:
- 既入力を上書きしない方針が素直(巻き戻し復元分は別バッファに置いて「↑キーで呼び出し」等にする手もある)
- tui-input-queue とのバッティングはチケット着手時に再確認
- 巻き戻された旨をユーザーに 1 行のヒントstatus line / トースト等)で出す。サイレントに戻すと「画面が消えて何が起きたか分からない」事故になりやすい
## 完了条件
- TUI で Submit → AI 応答が始まる前に Pause/Cancel すると、画面が Submit 前と一致し、入力欄に元のテキストが戻っている
- AI が 1 トークンでも応答した後の Pause/Cancel では従来通り(巻き戻さず Paused / Idle
- 巻き戻しが起きたことがユーザーから視認できる
## 範囲外
- 複数ターン巻き戻し(→ `tickets/pod-session-fork.md` で Fork として実装する計画)
- 巻き戻し履歴のスタック化(直前 1 件のみ復元)
- pod_cli / 他クライアントの同等 UX本チケットは TUI 限定)
## 依存
- `tickets/pod-empty-turn-rollback.md` の Pod 側実装 + 巻き戻し通知イベントの確定