feat: add manual rewind control
This commit is contained in:
parent
85ffbaf10a
commit
f8881f7289
|
|
@ -23,7 +23,8 @@ use crate::spawn::comm_tools::{
|
|||
use crate::spawn::registry::SpawnedPodRegistry;
|
||||
use crate::spawn::tool::spawn_pod_tool;
|
||||
use protocol::{
|
||||
AlertLevel, AlertSource, ErrorCode, Event, Method, PodStatus, RunResult, Segment, TurnResult,
|
||||
AlertLevel, AlertSource, ErrorCode, Event, Method, PodStatus, RewindTargetId, RunResult,
|
||||
Segment, TurnResult,
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -781,6 +782,38 @@ async fn controller_loop<C, St>(
|
|||
}
|
||||
},
|
||||
|
||||
Method::ListRewindTargets => match shared_state.get_status() {
|
||||
PodStatus::Idle | PodStatus::Paused => emit_rewind_targets(&pod, &event_tx),
|
||||
PodStatus::Running => {
|
||||
let _ = event_tx.send(Event::Error {
|
||||
code: ErrorCode::AlreadyRunning,
|
||||
message: "Pod is already executing a turn; rewind can only run while idle or paused"
|
||||
.into(),
|
||||
});
|
||||
}
|
||||
},
|
||||
|
||||
Method::RewindTo {
|
||||
target,
|
||||
expected_head_entries,
|
||||
} => match shared_state.get_status() {
|
||||
PodStatus::Idle | PodStatus::Paused => {
|
||||
if apply_rewind(&mut pod, &event_tx, target, expected_head_entries) {
|
||||
shared_state.set_status(PodStatus::Idle);
|
||||
let _ = event_tx.send(Event::Status {
|
||||
status: PodStatus::Idle,
|
||||
});
|
||||
}
|
||||
}
|
||||
PodStatus::Running => {
|
||||
let _ = event_tx.send(Event::Error {
|
||||
code: ErrorCode::AlreadyRunning,
|
||||
message: "Pod is already executing a turn; rewind can only run while idle or paused"
|
||||
.into(),
|
||||
});
|
||||
}
|
||||
},
|
||||
|
||||
Method::Shutdown => {
|
||||
let _ = event_tx.send(Event::Shutdown);
|
||||
break;
|
||||
|
|
@ -1014,10 +1047,10 @@ where
|
|||
message: "Pod is already executing a turn".into(),
|
||||
});
|
||||
}
|
||||
Some(Method::Compact) => {
|
||||
Some(Method::Compact | Method::ListRewindTargets | Method::RewindTo { .. }) => {
|
||||
let _ = event_tx.send(Event::Error {
|
||||
code: ErrorCode::AlreadyRunning,
|
||||
message: "Pod is already executing a turn; compact can only run while idle"
|
||||
message: "Pod is already executing a turn; rewind/compact can only run while idle or paused"
|
||||
.into(),
|
||||
});
|
||||
}
|
||||
|
|
@ -1069,6 +1102,70 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
fn emit_rewind_targets<C, St>(pod: &Pod<C, St>, event_tx: &broadcast::Sender<Event>)
|
||||
where
|
||||
C: LlmClient,
|
||||
St: Store,
|
||||
{
|
||||
match pod.list_rewind_targets() {
|
||||
Ok((head_entries, targets)) => {
|
||||
let _ = event_tx.send(Event::RewindTargets {
|
||||
head_entries,
|
||||
targets,
|
||||
});
|
||||
}
|
||||
Err(err) => {
|
||||
let _ = event_tx.send(Event::Error {
|
||||
code: ErrorCode::Internal,
|
||||
message: err.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_rewind<C, St>(
|
||||
pod: &mut Pod<C, St>,
|
||||
event_tx: &broadcast::Sender<Event>,
|
||||
target: RewindTargetId,
|
||||
expected_head_entries: usize,
|
||||
) -> bool
|
||||
where
|
||||
C: LlmClient,
|
||||
St: Store,
|
||||
{
|
||||
match pod.rewind_to(target, expected_head_entries) {
|
||||
Ok(applied) => match applied
|
||||
.entries
|
||||
.into_iter()
|
||||
.map(serde_json::to_value)
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
{
|
||||
Ok(entries) => {
|
||||
let _ = event_tx.send(Event::RewindApplied {
|
||||
entries,
|
||||
input: applied.input,
|
||||
summary: applied.summary,
|
||||
});
|
||||
true
|
||||
}
|
||||
Err(error) => {
|
||||
let _ = event_tx.send(Event::Error {
|
||||
code: ErrorCode::Internal,
|
||||
message: format!("failed to encode rewind snapshot: {error}"),
|
||||
});
|
||||
false
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
let _ = event_tx.send(Event::Error {
|
||||
code: ErrorCode::InvalidRequest,
|
||||
message: err.to_string(),
|
||||
});
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn build_greeting<C, St>(pod: &Pod<C, St>) -> protocol::Greeting
|
||||
where
|
||||
C: LlmClient,
|
||||
|
|
|
|||
|
|
@ -40,7 +40,9 @@ use crate::runtime::pod_registry::{self, ScopeAllocationGuard, ScopeLockError};
|
|||
use crate::workflow::WorkflowResolveError;
|
||||
use async_trait::async_trait;
|
||||
use llm_worker::interceptor::PreRequestAction;
|
||||
use protocol::{AlertLevel, AlertSource, Event, Segment};
|
||||
use protocol::{
|
||||
AlertLevel, AlertSource, Event, RewindSummary, RewindTarget, RewindTargetId, Segment,
|
||||
};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
|
|
@ -830,6 +832,85 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
&self.store
|
||||
}
|
||||
|
||||
/// List user-submitted turns in newest-first order for the manual rewind picker.
|
||||
pub fn list_rewind_targets(&self) -> Result<(usize, Vec<RewindTarget>), RewindError> {
|
||||
let loc = self.segment_state.location();
|
||||
let entries = self.store.read_all(loc.session_id, loc.segment_id)?;
|
||||
Ok((
|
||||
entries.len(),
|
||||
build_rewind_targets(loc.segment_id, &entries),
|
||||
))
|
||||
}
|
||||
|
||||
/// Truncate the current segment to just before a previously listed user input.
|
||||
pub fn rewind_to(
|
||||
&mut self,
|
||||
target: RewindTargetId,
|
||||
expected_head_entries: usize,
|
||||
) -> Result<RewindAppliedState, RewindError> {
|
||||
let loc = self.segment_state.location();
|
||||
if target.segment_id != loc.segment_id {
|
||||
return Err(RewindError::Invalid(
|
||||
"rewind target belongs to a different segment".into(),
|
||||
));
|
||||
}
|
||||
|
||||
let entries = self.store.read_all(loc.session_id, loc.segment_id)?;
|
||||
if entries.len() != expected_head_entries {
|
||||
return Err(RewindError::Invalid(format!(
|
||||
"session head changed since picker opened (expected {expected_head_entries}, current {})",
|
||||
entries.len()
|
||||
)));
|
||||
}
|
||||
|
||||
let Some(LogEntry::UserInput { segments, .. }) = entries.get(target.user_input_entry_index)
|
||||
else {
|
||||
return Err(RewindError::Invalid(
|
||||
"rewind target is no longer a user message".into(),
|
||||
));
|
||||
};
|
||||
let input = segments.clone();
|
||||
let truncate_entries = rewind_truncate_entries(&entries, target.user_input_entry_index);
|
||||
let retained = entries[..truncate_entries].to_vec();
|
||||
let tool_side_effect_warning = suffix_has_tool_side_effects(&entries[truncate_entries..]);
|
||||
let state = segment_log::collect_state(&retained);
|
||||
let extract_pointer = memory::extract::fold_pointer(&state.extensions);
|
||||
let task_store = tools::TaskStore::from_history(&state.history);
|
||||
let summary = RewindSummary {
|
||||
truncated_to_entries: truncate_entries,
|
||||
discarded_entries: entries.len().saturating_sub(truncate_entries),
|
||||
tool_side_effect_warning,
|
||||
};
|
||||
|
||||
self.store
|
||||
.truncate(loc.session_id, loc.segment_id, truncate_entries)?;
|
||||
self.segment_state.set_entries_written(truncate_entries);
|
||||
self.sink.truncate_silent(truncate_entries);
|
||||
|
||||
self.worker_mut().set_history(state.history);
|
||||
self.worker_mut().set_request_config(state.config);
|
||||
self.worker_mut().set_turn_count(state.turn_count);
|
||||
self.worker_mut()
|
||||
.set_last_run_interrupted(state.last_run_interrupted);
|
||||
self.user_segments = state.user_segments;
|
||||
*self.usage_history.lock().expect("usage_history poisoned") = state.usage_history;
|
||||
*self
|
||||
.pending_attachments
|
||||
.lock()
|
||||
.expect("pending_attachments poisoned") = Vec::new();
|
||||
*self
|
||||
.extract_pointer
|
||||
.lock()
|
||||
.expect("extract_pointer poisoned") = extract_pointer;
|
||||
self.task_store = task_store;
|
||||
|
||||
Ok(RewindAppliedState {
|
||||
entries: retained,
|
||||
input,
|
||||
summary,
|
||||
})
|
||||
}
|
||||
|
||||
fn write_pod_metadata_pending(&self) -> Result<(), StoreError> {
|
||||
let Some(writer) = &self.pod_metadata_writer else {
|
||||
return Ok(());
|
||||
|
|
@ -4328,6 +4409,110 @@ fn token_budget_bytes(tokens: u64) -> usize {
|
|||
}
|
||||
|
||||
/// Pod errors.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum RewindError {
|
||||
#[error(transparent)]
|
||||
Store(#[from] StoreError),
|
||||
#[error("{0}")]
|
||||
Invalid(String),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RewindAppliedState {
|
||||
pub entries: Vec<LogEntry>,
|
||||
pub input: Vec<Segment>,
|
||||
pub summary: RewindSummary,
|
||||
}
|
||||
|
||||
fn build_rewind_targets(segment_id: uuid::Uuid, entries: &[LogEntry]) -> Vec<RewindTarget> {
|
||||
let head_entries = entries.len();
|
||||
let mut turn_index = 0usize;
|
||||
let mut targets = Vec::new();
|
||||
for (entry_index, entry) in entries.iter().enumerate() {
|
||||
if let LogEntry::UserInput { segments, ts } = entry {
|
||||
turn_index += 1;
|
||||
let truncate_entries = rewind_truncate_entries(entries, entry_index);
|
||||
let tool_warning = suffix_has_tool_side_effects(&entries[truncate_entries..]);
|
||||
targets.push(RewindTarget {
|
||||
id: RewindTargetId {
|
||||
segment_id,
|
||||
user_input_entry_index: entry_index,
|
||||
},
|
||||
expected_head_entries: head_entries,
|
||||
truncate_entries,
|
||||
turn_index,
|
||||
timestamp_ms: Some(*ts),
|
||||
preview: preview_segments(segments),
|
||||
eligible: true,
|
||||
disabled_reason: None,
|
||||
warning: tool_warning.then(|| {
|
||||
"history suffix will be discarded; tool side effects are not undone".into()
|
||||
}),
|
||||
});
|
||||
}
|
||||
}
|
||||
targets.reverse();
|
||||
targets
|
||||
}
|
||||
|
||||
fn rewind_truncate_entries(entries: &[LogEntry], user_input_entry_index: usize) -> usize {
|
||||
if user_input_entry_index > 0
|
||||
&& matches!(
|
||||
entries.get(user_input_entry_index - 1),
|
||||
Some(LogEntry::Invoke { .. })
|
||||
)
|
||||
{
|
||||
user_input_entry_index - 1
|
||||
} else {
|
||||
user_input_entry_index
|
||||
}
|
||||
}
|
||||
|
||||
fn suffix_has_tool_side_effects(entries: &[LogEntry]) -> bool {
|
||||
entries.iter().any(|entry| match entry {
|
||||
LogEntry::ToolResult { .. } => true,
|
||||
LogEntry::AssistantItem { item, .. } => logged_item_is_tool_call(item),
|
||||
_ => false,
|
||||
})
|
||||
}
|
||||
|
||||
fn logged_item_is_tool_call(item: &session_store::LoggedItem) -> bool {
|
||||
matches!(item, session_store::LoggedItem::ToolCall { .. })
|
||||
}
|
||||
|
||||
fn preview_segments(segments: &[Segment]) -> String {
|
||||
let mut preview = String::new();
|
||||
for segment in segments {
|
||||
if !preview.is_empty() {
|
||||
preview.push(' ');
|
||||
}
|
||||
match segment {
|
||||
Segment::Text { content } => preview.push_str(content.trim()),
|
||||
Segment::Paste { content, .. } => preview.push_str(content.trim()),
|
||||
Segment::FileRef { path } => {
|
||||
preview.push('@');
|
||||
preview.push_str(path);
|
||||
}
|
||||
Segment::KnowledgeRef { slug } => {
|
||||
preview.push('#');
|
||||
preview.push_str(slug);
|
||||
}
|
||||
Segment::WorkflowInvoke { slug } => {
|
||||
preview.push('/');
|
||||
preview.push_str(slug);
|
||||
}
|
||||
Segment::Unknown => preview.push_str("[unknown input segment]"),
|
||||
}
|
||||
}
|
||||
let preview = preview.replace(['\n', '\r'], " ");
|
||||
let mut chars = preview.chars();
|
||||
let mut out: String = chars.by_ref().take(120).collect();
|
||||
if chars.next().is_some() {
|
||||
out.push('…');
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum PodError {
|
||||
#[error(transparent)]
|
||||
|
|
@ -4809,6 +4994,156 @@ mod build_summary_prompt_tests {
|
|||
}
|
||||
}
|
||||
|
||||
fn text_segment(text: &str) -> Segment {
|
||||
Segment::Text {
|
||||
content: text.into(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn rewind_test_pod() -> (tempfile::TempDir, Pod<NoopClient, session_store::FsStore>) {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let manifest = minimal_manifest_with_skills(vec![]);
|
||||
let store = session_store::FsStore::new(dir.path().join("sessions")).unwrap();
|
||||
let pwd = dir.path().join("workspace");
|
||||
std::fs::create_dir_all(&pwd).unwrap();
|
||||
let scope = Scope::writable(&pwd).unwrap();
|
||||
let mut pod = Pod::new(manifest, Worker::new(NoopClient), store, pwd, scope)
|
||||
.await
|
||||
.unwrap();
|
||||
pod.ensure_segment_head().unwrap();
|
||||
(dir, pod)
|
||||
}
|
||||
|
||||
fn append_test_entry(pod: &Pod<NoopClient, session_store::FsStore>, entry: LogEntry) {
|
||||
let loc = pod.segment_state.location();
|
||||
pod.store
|
||||
.append(loc.session_id, loc.segment_id, &entry)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn append_user_turn(pod: &Pod<NoopClient, session_store::FsStore>, ts: u64, text: &str) {
|
||||
append_test_entry(
|
||||
pod,
|
||||
LogEntry::Invoke {
|
||||
ts,
|
||||
trigger: protocol::InvokeKind::UserSend,
|
||||
},
|
||||
);
|
||||
append_test_entry(
|
||||
pod,
|
||||
LogEntry::UserInput {
|
||||
ts: ts + 1,
|
||||
segments: vec![text_segment(text)],
|
||||
},
|
||||
);
|
||||
append_test_entry(
|
||||
pod,
|
||||
LogEntry::TurnEnd {
|
||||
ts: ts + 2,
|
||||
turn_count: 1,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rewind_target_listing_is_newest_first_and_warns_on_tool_suffix() {
|
||||
let (_dir, pod) = rewind_test_pod().await;
|
||||
append_user_turn(&pod, 10, "first message");
|
||||
append_user_turn(&pod, 20, "second message");
|
||||
append_test_entry(
|
||||
&pod,
|
||||
LogEntry::ToolResult {
|
||||
ts: 30,
|
||||
item: session_store::LoggedItem::ToolResult {
|
||||
call_id: "call-1".into(),
|
||||
summary: "wrote a file".into(),
|
||||
content: None,
|
||||
is_error: false,
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
let (head_entries, targets) = pod.list_rewind_targets().unwrap();
|
||||
let loc = pod.segment_state.location();
|
||||
|
||||
assert_eq!(
|
||||
head_entries,
|
||||
pod.store
|
||||
.read_all(loc.session_id, loc.segment_id)
|
||||
.unwrap()
|
||||
.len()
|
||||
);
|
||||
assert_eq!(targets.len(), 2);
|
||||
assert_eq!(targets[0].preview, "second message");
|
||||
assert_eq!(targets[1].preview, "first message");
|
||||
assert!(
|
||||
targets[0]
|
||||
.warning
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.contains("tool side effects")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rewind_apply_truncates_log_and_restores_selected_input() {
|
||||
let (_dir, mut pod) = rewind_test_pod().await;
|
||||
append_user_turn(&pod, 10, "first message");
|
||||
append_user_turn(&pod, 20, "second message");
|
||||
append_test_entry(
|
||||
&pod,
|
||||
LogEntry::ToolResult {
|
||||
ts: 30,
|
||||
item: session_store::LoggedItem::ToolResult {
|
||||
call_id: "call-1".into(),
|
||||
summary: "wrote a file".into(),
|
||||
content: None,
|
||||
is_error: false,
|
||||
},
|
||||
},
|
||||
);
|
||||
let (head_entries, targets) = pod.list_rewind_targets().unwrap();
|
||||
let expected_truncate_entries = targets[0].truncate_entries;
|
||||
let target = targets[0].id.clone();
|
||||
|
||||
let applied = pod.rewind_to(target, head_entries).unwrap();
|
||||
|
||||
assert_eq!(preview_segments(&applied.input), "second message");
|
||||
assert_eq!(
|
||||
applied.summary.truncated_to_entries,
|
||||
expected_truncate_entries
|
||||
);
|
||||
assert!(applied.summary.tool_side_effect_warning);
|
||||
let loc = pod.segment_state.location();
|
||||
assert_eq!(
|
||||
pod.store
|
||||
.read_all(loc.session_id, loc.segment_id)
|
||||
.unwrap()
|
||||
.len(),
|
||||
expected_truncate_entries
|
||||
);
|
||||
assert_eq!(pod.worker().history().len(), 1);
|
||||
assert_eq!(
|
||||
pod.worker().history()[0].as_text().unwrap(),
|
||||
"first message"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rewind_apply_rejects_stale_head() {
|
||||
let (_dir, mut pod) = rewind_test_pod().await;
|
||||
append_user_turn(&pod, 10, "first message");
|
||||
let (head_entries, targets) = pod.list_rewind_targets().unwrap();
|
||||
append_user_turn(&pod, 20, "newer message");
|
||||
|
||||
let err = pod
|
||||
.rewind_to(targets[0].id.clone(), head_entries)
|
||||
.unwrap_err()
|
||||
.to_string();
|
||||
|
||||
assert!(err.contains("session head changed"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_interrupt_prep_appends_via_callback_and_logs_independent_entries() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
|
|
|
|||
|
|
@ -36,6 +36,14 @@ pub enum Method {
|
|||
/// This is a typed control method: clients must not send `compact` as a
|
||||
/// `Method::Run` user message.
|
||||
Compact,
|
||||
/// Ask the Pod to list valid rewind targets from its authoritative session log.
|
||||
ListRewindTargets,
|
||||
/// Truncate the current session back to the selected rewind target and
|
||||
/// return the selected user input to the client composer.
|
||||
RewindTo {
|
||||
target: RewindTargetId,
|
||||
expected_head_entries: usize,
|
||||
},
|
||||
Shutdown,
|
||||
/// Request a list of completion candidates from the Pod.
|
||||
///
|
||||
|
|
@ -125,7 +133,7 @@ pub enum PodEvent {
|
|||
/// variants — emits an alert and inserts a `[unknown input segment]`
|
||||
/// placeholder into the LLM context so neither user nor LLM is blind to
|
||||
/// the dropped intent.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(tag = "kind", rename_all = "snake_case")]
|
||||
pub enum Segment {
|
||||
/// Free-form text. The fallback every client can produce.
|
||||
|
|
@ -433,6 +441,19 @@ pub enum Event {
|
|||
kind: CompletionKind,
|
||||
entries: Vec<CompletionEntry>,
|
||||
},
|
||||
/// Reply to `Method::ListRewindTargets`. Clients should only open a picker
|
||||
/// in response to their own pending request; the event may be broadcast.
|
||||
RewindTargets {
|
||||
head_entries: usize,
|
||||
targets: Vec<RewindTarget>,
|
||||
},
|
||||
/// A rewind has truncated the authoritative session. `entries` is the
|
||||
/// retained session-log prefix clients should use to reseed display state.
|
||||
RewindApplied {
|
||||
entries: Vec<serde_json::Value>,
|
||||
input: Vec<Segment>,
|
||||
summary: RewindSummary,
|
||||
},
|
||||
/// Reply to `Method::ListVisiblePods`. Payload is a stable JSON value so
|
||||
/// the Pod crate can evolve discovery fields without introducing a protocol
|
||||
/// dependency on session-store.
|
||||
|
|
@ -545,6 +566,34 @@ pub struct CompletionEntry {
|
|||
pub is_dir: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct RewindTargetId {
|
||||
pub segment_id: uuid::Uuid,
|
||||
pub user_input_entry_index: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct RewindTarget {
|
||||
pub id: RewindTargetId,
|
||||
pub expected_head_entries: usize,
|
||||
pub truncate_entries: usize,
|
||||
pub turn_index: usize,
|
||||
pub timestamp_ms: Option<u64>,
|
||||
pub preview: String,
|
||||
pub eligible: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub disabled_reason: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub warning: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct RewindSummary {
|
||||
pub truncated_to_entries: usize,
|
||||
pub discarded_entries: usize,
|
||||
pub tool_side_effect_warning: bool,
|
||||
}
|
||||
|
||||
/// Pod self-description rendered by the TUI when a session starts empty.
|
||||
///
|
||||
/// Built once in the Pod controller from the resolved manifest and
|
||||
|
|
|
|||
|
|
@ -2,8 +2,8 @@ use std::collections::VecDeque;
|
|||
use std::time::Instant;
|
||||
|
||||
use protocol::{
|
||||
AlertLevel, AlertSource, CompletionEntry, CompletionKind, Event, Method, PodStatus, RunResult,
|
||||
Segment,
|
||||
AlertLevel, AlertSource, CompletionEntry, CompletionKind, Event, Method, PodStatus,
|
||||
RewindTarget, RunResult, Segment,
|
||||
};
|
||||
|
||||
use crate::block::{
|
||||
|
|
@ -51,6 +51,19 @@ impl CompletionState {
|
|||
pub const MAX_VISIBLE: usize = 6;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RewindPickerState {
|
||||
pub head_entries: usize,
|
||||
pub targets: Vec<RewindTarget>,
|
||||
pub selected: usize,
|
||||
}
|
||||
|
||||
impl RewindPickerState {
|
||||
pub fn selected_target(&self) -> Option<&RewindTarget> {
|
||||
self.targets.get(self.selected)
|
||||
}
|
||||
}
|
||||
|
||||
struct RollbackSubmitState {
|
||||
text: String,
|
||||
segments: Vec<Segment>,
|
||||
|
|
@ -126,6 +139,10 @@ pub struct App {
|
|||
/// Completion popup state, when an `@` / `#` / `/` token is in
|
||||
/// flight. `None` whenever the trigger conditions don't hold.
|
||||
pub completion: Option<CompletionState>,
|
||||
/// Dedicated main-view rewind picker state.
|
||||
pub rewind_picker: Option<RewindPickerState>,
|
||||
rewind_request_pending: bool,
|
||||
greeting: Option<protocol::Greeting>,
|
||||
/// In-TUI mirror of the Pod's session task store, reconstructed
|
||||
/// directly from observed `TaskCreate` / `TaskUpdate` tool calls and
|
||||
/// `[Session TaskStore snapshot]` system messages — no protocol
|
||||
|
|
@ -177,6 +194,9 @@ impl App {
|
|||
cache: FileCache::new(),
|
||||
assistant_streaming: false,
|
||||
completion: None,
|
||||
rewind_picker: None,
|
||||
rewind_request_pending: false,
|
||||
greeting: None,
|
||||
task_store: TaskStore::new(),
|
||||
task_pane_open: false,
|
||||
task_pane_scroll: 0,
|
||||
|
|
@ -921,6 +941,48 @@ impl App {
|
|||
state.selected = 0;
|
||||
}
|
||||
}
|
||||
Event::RewindTargets {
|
||||
head_entries,
|
||||
targets,
|
||||
} => {
|
||||
if self.rewind_request_pending {
|
||||
self.rewind_request_pending = false;
|
||||
let selected = targets.iter().position(|t| t.eligible).unwrap_or(0);
|
||||
self.rewind_picker = Some(RewindPickerState {
|
||||
head_entries,
|
||||
targets,
|
||||
selected,
|
||||
});
|
||||
self.scroll = Scroll::default();
|
||||
}
|
||||
}
|
||||
Event::RewindApplied {
|
||||
entries,
|
||||
input,
|
||||
summary,
|
||||
} => {
|
||||
if let Some(greeting) = self.greeting.clone() {
|
||||
self.restore_snapshot(&entries, greeting);
|
||||
}
|
||||
self.input.replace_with_segments(&input);
|
||||
self.completion = None;
|
||||
self.close_rewind_picker();
|
||||
self.reset_run_state(self.pod_status);
|
||||
let mut message = format!(
|
||||
"Rewound session: discarded {} log entries; restored selected input to composer.",
|
||||
summary.discarded_entries
|
||||
);
|
||||
if summary.tool_side_effect_warning {
|
||||
message.push_str(
|
||||
" History suffix was discarded; tool side effects were not undone.",
|
||||
);
|
||||
}
|
||||
self.blocks.push(Block::Alert {
|
||||
level: AlertLevel::Warn,
|
||||
source: AlertSource::Pod,
|
||||
message,
|
||||
});
|
||||
}
|
||||
Event::VisiblePods { .. }
|
||||
| Event::PodInspection { .. }
|
||||
| Event::PodAttachRestore { .. } => {}
|
||||
|
|
@ -1220,6 +1282,70 @@ impl App {
|
|||
self.command_input.insert_str(&completed);
|
||||
}
|
||||
|
||||
pub fn request_rewind_picker(&mut self) -> Option<Method> {
|
||||
if !self.connected {
|
||||
self.push_command_diagnostic("cannot rewind before the Pod is connected");
|
||||
return None;
|
||||
}
|
||||
if self.running {
|
||||
self.push_command_diagnostic("cannot rewind while the Pod is running");
|
||||
return None;
|
||||
}
|
||||
self.completion = None;
|
||||
self.rewind_picker = None;
|
||||
self.rewind_request_pending = true;
|
||||
Some(Method::ListRewindTargets)
|
||||
}
|
||||
|
||||
pub fn close_rewind_picker(&mut self) {
|
||||
self.rewind_picker = None;
|
||||
self.rewind_request_pending = false;
|
||||
}
|
||||
|
||||
pub fn rewind_picker_up(&mut self) {
|
||||
if let Some(picker) = self.rewind_picker.as_mut() {
|
||||
if picker.targets.is_empty() {
|
||||
return;
|
||||
}
|
||||
picker.selected = if picker.selected == 0 {
|
||||
picker.targets.len() - 1
|
||||
} else {
|
||||
picker.selected - 1
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rewind_picker_down(&mut self) {
|
||||
if let Some(picker) = self.rewind_picker.as_mut() {
|
||||
if !picker.targets.is_empty() {
|
||||
picker.selected = (picker.selected + 1) % picker.targets.len();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn submit_rewind_picker(&mut self) -> Option<Method> {
|
||||
let Some(picker) = self.rewind_picker.as_ref() else {
|
||||
return None;
|
||||
};
|
||||
let Some(target) = picker.selected_target() else {
|
||||
self.push_command_diagnostic("no rewind target is available");
|
||||
return None;
|
||||
};
|
||||
if !target.eligible {
|
||||
self.push_command_diagnostic(
|
||||
target
|
||||
.disabled_reason
|
||||
.clone()
|
||||
.unwrap_or_else(|| "rewind target is disabled".into()),
|
||||
);
|
||||
return None;
|
||||
}
|
||||
Some(Method::RewindTo {
|
||||
target: target.id.clone(),
|
||||
expected_head_entries: target.expected_head_entries,
|
||||
})
|
||||
}
|
||||
|
||||
fn command_environment(&self) -> CommandEnvironment {
|
||||
CommandEnvironment {
|
||||
connected: self.connected,
|
||||
|
|
@ -1247,6 +1373,11 @@ impl App {
|
|||
self.input_mode = CommandInputMode::Composer;
|
||||
self.command_completion_selected = None;
|
||||
}
|
||||
if let Some(Method::ListRewindTargets) = result.method.as_ref() {
|
||||
self.completion = None;
|
||||
self.rewind_picker = None;
|
||||
self.rewind_request_pending = true;
|
||||
}
|
||||
result.method
|
||||
}
|
||||
|
||||
|
|
@ -1334,6 +1465,7 @@ impl App {
|
|||
/// produced. Followed by `Event::Entry` updates for anything
|
||||
/// committed after the snapshot.
|
||||
fn restore_snapshot(&mut self, entries: &[serde_json::Value], greeting: protocol::Greeting) {
|
||||
self.greeting = Some(greeting.clone());
|
||||
self.context_window = greeting.context_window;
|
||||
self.session_context_tokens = greeting.context_tokens;
|
||||
self.turn_index = 0;
|
||||
|
|
|
|||
|
|
@ -147,6 +147,15 @@ impl CommandRegistry {
|
|||
can_execute: compact_available,
|
||||
executor: compact_command,
|
||||
});
|
||||
registry.register(CommandSpec {
|
||||
name: "rewind",
|
||||
aliases: &["rollback"],
|
||||
usage: "rewind",
|
||||
description: "Open the rewind target picker.",
|
||||
argument_parser: rewind_args,
|
||||
can_execute: rewind_available,
|
||||
executor: rewind_command,
|
||||
});
|
||||
registry
|
||||
}
|
||||
|
||||
|
|
@ -284,6 +293,15 @@ fn compact_args(raw: &str) -> Result<CommandArgs, CommandDiagnostic> {
|
|||
}
|
||||
}
|
||||
|
||||
fn rewind_args(raw: &str) -> Result<CommandArgs, CommandDiagnostic> {
|
||||
let args = CommandArgs::parse_whitespace(raw);
|
||||
if args.argv().is_empty() {
|
||||
Ok(args)
|
||||
} else {
|
||||
Err(CommandDiagnostic::new("Invalid arguments. Usage: rewind"))
|
||||
}
|
||||
}
|
||||
|
||||
fn compact_available(environment: &CommandEnvironment) -> Result<(), CommandDiagnostic> {
|
||||
if !environment.connected {
|
||||
return Err(CommandDiagnostic::new(
|
||||
|
|
@ -303,6 +321,20 @@ fn compact_available(environment: &CommandEnvironment) -> Result<(), CommandDiag
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn rewind_available(environment: &CommandEnvironment) -> Result<(), CommandDiagnostic> {
|
||||
if !environment.connected {
|
||||
return Err(CommandDiagnostic::new(
|
||||
"Cannot rewind before the Pod is connected.",
|
||||
));
|
||||
}
|
||||
if environment.running {
|
||||
return Err(CommandDiagnostic::new(
|
||||
"Cannot rewind while the Pod is running.",
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn help_command(invocation: CommandInvocation<'_>) -> CommandExecution {
|
||||
if let Some(name) = invocation.args.argv().first() {
|
||||
let Some(command) = invocation.registry.find(name) else {
|
||||
|
|
@ -350,6 +382,18 @@ fn compact_command(invocation: CommandInvocation<'_>) -> CommandExecution {
|
|||
}
|
||||
}
|
||||
|
||||
fn rewind_command(invocation: CommandInvocation<'_>) -> CommandExecution {
|
||||
let _ = invocation.command;
|
||||
let _ = invocation.environment;
|
||||
let _ = invocation.args.raw();
|
||||
CommandExecution {
|
||||
method: Some(Method::ListRewindTargets),
|
||||
diagnostics: vec![CommandDiagnostic::new("rewind picker requested")],
|
||||
exit_command_mode: true,
|
||||
clear_input: true,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
@ -421,4 +465,40 @@ mod tests {
|
|||
assert!(result.method.is_none());
|
||||
assert!(result.diagnostics[0].message.contains("paused"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rewind_command_and_alias_return_list_method() {
|
||||
let registry = CommandRegistry::builtins();
|
||||
for command in ["rewind", "rollback"] {
|
||||
let result = registry.dispatch(command, &env());
|
||||
assert!(matches!(result.method, Some(Method::ListRewindTargets)));
|
||||
assert!(result.exit_command_mode);
|
||||
assert!(result.clear_input);
|
||||
assert!(result.diagnostics[0].message.contains("rewind picker"));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rewind_invalid_arguments_are_local_diagnostic() {
|
||||
let registry = CommandRegistry::builtins();
|
||||
let result = registry.dispatch("rewind now", &env());
|
||||
assert!(result.method.is_none());
|
||||
assert!(!result.exit_command_mode);
|
||||
assert!(result.diagnostics[0].message.contains("Invalid arguments"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rewind_rejects_running_but_allows_paused() {
|
||||
let registry = CommandRegistry::builtins();
|
||||
let mut running = env();
|
||||
running.running = true;
|
||||
let result = registry.dispatch("rewind", &running);
|
||||
assert!(result.method.is_none());
|
||||
assert!(result.diagnostics[0].message.contains("running"));
|
||||
|
||||
let mut paused = env();
|
||||
paused.paused = true;
|
||||
let result = registry.dispatch("rewind", &paused);
|
||||
assert!(matches!(result.method, Some(Method::ListRewindTargets)));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -801,6 +801,9 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option<Method> {
|
|||
app.toggle_task_pane();
|
||||
Some(None)
|
||||
}
|
||||
KeyCode::Char(c) if c.eq_ignore_ascii_case(&'r') && ctrl => {
|
||||
Some(app.request_rewind_picker())
|
||||
}
|
||||
KeyCode::Char('a') if ctrl => {
|
||||
app.move_cursor_start();
|
||||
Some(app.refresh_completion())
|
||||
|
|
@ -880,6 +883,25 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option<Method> {
|
|||
return handle_command_key(app, key);
|
||||
}
|
||||
|
||||
if app.rewind_picker.is_some() {
|
||||
match key.code {
|
||||
KeyCode::Esc => {
|
||||
app.close_rewind_picker();
|
||||
return None;
|
||||
}
|
||||
KeyCode::Enter => return app.submit_rewind_picker(),
|
||||
KeyCode::Up => {
|
||||
app.rewind_picker_up();
|
||||
return None;
|
||||
}
|
||||
KeyCode::Down => {
|
||||
app.rewind_picker_down();
|
||||
return None;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Completion popup overrides — only when there's something to
|
||||
// navigate / commit. An empty popup (request in flight) falls
|
||||
// through to the default behaviour.
|
||||
|
|
@ -1079,6 +1101,7 @@ fn handle_pause_or_quit(app: &mut App) -> Option<Method> {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use protocol::{Event, Segment};
|
||||
|
||||
#[test]
|
||||
fn parse_pod_name_mode() {
|
||||
|
|
@ -1542,6 +1565,104 @@ mod tests {
|
|||
}));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ctrl_r_requests_rewind_picker_when_idle_or_paused() {
|
||||
let mut app = App::new("agent".to_string());
|
||||
app.connected = true;
|
||||
let idle = handle_key(
|
||||
&mut app,
|
||||
KeyEvent::new(KeyCode::Char('r'), KeyModifiers::CONTROL),
|
||||
);
|
||||
assert!(matches!(idle, Some(Method::ListRewindTargets)));
|
||||
|
||||
app.set_pod_status(PodStatus::Paused);
|
||||
let paused = handle_key(
|
||||
&mut app,
|
||||
KeyEvent::new(KeyCode::Char('r'), KeyModifiers::CONTROL),
|
||||
);
|
||||
assert!(matches!(paused, Some(Method::ListRewindTargets)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ctrl_r_is_rejected_while_running() {
|
||||
let mut app = App::new("agent".to_string());
|
||||
app.connected = true;
|
||||
app.set_pod_status(PodStatus::Running);
|
||||
|
||||
let method = handle_key(
|
||||
&mut app,
|
||||
KeyEvent::new(KeyCode::Char('r'), KeyModifiers::CONTROL),
|
||||
);
|
||||
|
||||
assert!(method.is_none());
|
||||
assert!(has_alert(&app, "cannot rewind while the Pod is running"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rewind_picker_close_returns_to_history_view() {
|
||||
let mut app = App::new("agent".to_string());
|
||||
app.connected = true;
|
||||
app.handle_pod_event(Event::RewindTargets {
|
||||
head_entries: 1,
|
||||
targets: vec![],
|
||||
});
|
||||
assert!(app.rewind_picker.is_none());
|
||||
|
||||
let method = handle_key(
|
||||
&mut app,
|
||||
KeyEvent::new(KeyCode::Char('r'), KeyModifiers::CONTROL),
|
||||
);
|
||||
assert!(matches!(method, Some(Method::ListRewindTargets)));
|
||||
app.handle_pod_event(Event::RewindTargets {
|
||||
head_entries: 1,
|
||||
targets: vec![],
|
||||
});
|
||||
assert!(app.rewind_picker.is_some());
|
||||
|
||||
let method = handle_key(&mut app, KeyEvent::new(KeyCode::Esc, KeyModifiers::NONE));
|
||||
|
||||
assert!(method.is_none());
|
||||
assert!(app.rewind_picker.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rewind_applied_reseeds_display_and_restores_composer() {
|
||||
let mut app = App::new("agent".to_string());
|
||||
app.handle_pod_event(Event::Snapshot {
|
||||
greeting: test_greeting(),
|
||||
entries: vec![],
|
||||
status: PodStatus::Idle,
|
||||
});
|
||||
app.handle_pod_event(Event::RewindApplied {
|
||||
entries: vec![],
|
||||
input: vec![Segment::Text {
|
||||
content: "retry this".into(),
|
||||
}],
|
||||
summary: protocol::RewindSummary {
|
||||
truncated_to_entries: 0,
|
||||
discarded_entries: 2,
|
||||
tool_side_effect_warning: true,
|
||||
},
|
||||
});
|
||||
|
||||
assert_eq!(input_text(&app), "retry this");
|
||||
assert!(app.rewind_picker.is_none());
|
||||
assert!(has_alert(&app, "tool side effects"));
|
||||
}
|
||||
|
||||
fn test_greeting() -> protocol::Greeting {
|
||||
protocol::Greeting {
|
||||
pod_name: "agent".into(),
|
||||
cwd: "/tmp".into(),
|
||||
provider: "test".into(),
|
||||
model: "test".into(),
|
||||
scope_summary: "".into(),
|
||||
tools: vec![],
|
||||
context_window: 0,
|
||||
context_tokens: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn command_registry_suggestions_are_available() {
|
||||
let mut app = App::new("agent".to_string());
|
||||
|
|
|
|||
|
|
@ -419,6 +419,11 @@ fn draw_history(frame: &mut Frame, app: &mut App, area: Rect) {
|
|||
return;
|
||||
}
|
||||
|
||||
if let Some(picker) = app.rewind_picker.clone() {
|
||||
draw_rewind_picker(frame, app, history_area, inner, outer_block, &picker);
|
||||
return;
|
||||
}
|
||||
|
||||
let HistoryLayout { lines, turn_starts } = compute_history(app, inner.width);
|
||||
|
||||
// `lines` is already pre-wrapped: 1 entry == 1 terminal row. Scroll
|
||||
|
|
@ -447,6 +452,101 @@ fn draw_history(frame: &mut Frame, app: &mut App, area: Rect) {
|
|||
.render(history_area, frame.buffer_mut());
|
||||
}
|
||||
|
||||
fn draw_rewind_picker(
|
||||
frame: &mut Frame,
|
||||
app: &mut App,
|
||||
history_area: Rect,
|
||||
inner: Rect,
|
||||
outer_block: UiBlock<'_>,
|
||||
picker: &crate::app::RewindPickerState,
|
||||
) {
|
||||
let mut logical: Vec<Line<'static>> = Vec::new();
|
||||
logical.push(Line::from(vec![
|
||||
Span::styled(
|
||||
"Rewind targets",
|
||||
Style::default()
|
||||
.fg(Color::Yellow)
|
||||
.add_modifier(Modifier::BOLD),
|
||||
),
|
||||
Span::raw(format!(" head={} ", picker.head_entries)),
|
||||
Span::styled("Enter", Style::default().fg(Color::Green)),
|
||||
Span::raw(" apply "),
|
||||
Span::styled("Esc", Style::default().fg(Color::Green)),
|
||||
Span::raw(" cancel"),
|
||||
]));
|
||||
logical.push(Line::from(Span::styled(
|
||||
"Selecting a target discards the later history suffix; tool side effects are not undone.",
|
||||
Style::default().fg(Color::DarkGray),
|
||||
)));
|
||||
logical.push(Line::from(""));
|
||||
|
||||
if picker.targets.is_empty() {
|
||||
logical.push(Line::from(Span::styled(
|
||||
"No previous user messages are available to rewind.",
|
||||
Style::default().fg(Color::DarkGray),
|
||||
)));
|
||||
} else {
|
||||
for (idx, target) in picker.targets.iter().enumerate() {
|
||||
let selected = idx == picker.selected;
|
||||
let marker = if selected { "▶" } else { " " };
|
||||
let base_style = if selected {
|
||||
Style::default()
|
||||
.bg(Color::DarkGray)
|
||||
.add_modifier(Modifier::BOLD)
|
||||
} else if target.eligible {
|
||||
Style::default()
|
||||
} else {
|
||||
Style::default().fg(Color::DarkGray)
|
||||
};
|
||||
let ts = target
|
||||
.timestamp_ms
|
||||
.map(|ts| format!("{}", ts))
|
||||
.unwrap_or_else(|| "-".into());
|
||||
logical.push(Line::from(vec![
|
||||
Span::styled(marker.to_owned(), base_style),
|
||||
Span::styled(
|
||||
format!(
|
||||
" turn {} idx {} ts {} ",
|
||||
target.turn_index, target.id.user_input_entry_index, ts
|
||||
),
|
||||
base_style,
|
||||
),
|
||||
Span::styled(target.preview.clone(), base_style),
|
||||
]));
|
||||
if let Some(warning) = target.warning.as_ref() {
|
||||
logical.push(Line::from(Span::styled(
|
||||
format!(" warning: {warning}"),
|
||||
Style::default().fg(Color::Yellow),
|
||||
)));
|
||||
}
|
||||
if let Some(reason) = target.disabled_reason.as_ref() {
|
||||
logical.push(Line::from(Span::styled(
|
||||
format!(" disabled: {reason}"),
|
||||
Style::default().fg(Color::Red),
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut lines = Vec::new();
|
||||
for line in logical {
|
||||
wrap_line_into(line, inner.width, &mut lines);
|
||||
}
|
||||
|
||||
let tail_top = lines.len().saturating_sub(inner.height as usize);
|
||||
app.scroll.area_height = inner.height;
|
||||
app.scroll.total_lines = lines.len();
|
||||
app.scroll.tail_top_offset = tail_top;
|
||||
app.scroll.turn_starts.clear();
|
||||
app.scroll.top_offset = app.scroll.top_offset.min(tail_top);
|
||||
|
||||
let end = (app.scroll.top_offset + inner.height as usize).min(lines.len());
|
||||
let visible = lines[app.scroll.top_offset..end].to_vec();
|
||||
Paragraph::new(visible)
|
||||
.block(outer_block)
|
||||
.render(history_area, frame.buffer_mut());
|
||||
}
|
||||
|
||||
/// Width to reserve for the task side pane within the history rect.
|
||||
/// Returns 0 when the pane is closed or the rect is too narrow to host
|
||||
/// it without crushing the history view.
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user