diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index ae0c339e..2bab6f17 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -732,6 +732,31 @@ async fn controller_loop( } } + Method::Compact => match shared_state.get_status() { + PodStatus::Idle => { + if let Err(error) = pod.manual_compact().await { + let _ = event_tx.send(Event::Error { + code: worker_error_code(&error), + message: error.to_string(), + }); + } + } + PodStatus::Paused => { + let _ = event_tx.send(Event::Error { + code: ErrorCode::InvalidRequest, + message: "Cannot compact while the Pod is paused; resume or start a fresh turn first" + .into(), + }); + } + PodStatus::Running => { + let _ = event_tx.send(Event::Error { + code: ErrorCode::AlreadyRunning, + message: "Pod is already executing a turn; compact can only run while idle" + .into(), + }); + } + }, + Method::Shutdown => { let _ = event_tx.send(Event::Shutdown); break; @@ -965,6 +990,13 @@ where message: "Pod is already executing a turn".into(), }); } + Some(Method::Compact) => { + let _ = event_tx.send(Event::Error { + code: ErrorCode::AlreadyRunning, + message: "Pod is already executing a turn; compact can only run while idle" + .into(), + }); + } Some(Method::Notify { message }) => { // Live echo arrives via `Event::SystemItem` once // the in-flight turn's next `pre_llm_request` @@ -1320,4 +1352,46 @@ mod tests { "expected no PodEvent for notification-originated worker error" ); } + + #[tokio::test] + async fn compact_method_is_rejected_while_running() { + let mut env = make_env().await; + let mut events = env.event_tx.subscribe(); + env._method_tx + .send(Method::Compact) + .await + .expect("send compact"); + + let pod_future = async { + tokio::time::sleep(Duration::from_millis(50)).await; + Ok::<_, PodError>(PodRunResult::Finished) + }; + let (status, shutdown) = drive_turn( + pod_future, + &mut env.method_rx, + &env.event_tx, + &env.cancel_tx, + &env.shared_state, + &env.notify_buffer, + Some(&env.parent_socket_path), + "child-pod", + &env.spawned_registry, + false, + ) + .await; + assert_eq!(status, PodStatus::Idle); + assert!(!shutdown); + + let event = tokio::time::timeout(Duration::from_secs(1), events.recv()) + .await + .expect("event timeout") + .expect("event"); + match event { + Event::Error { code, message } => { + assert_eq!(code, ErrorCode::AlreadyRunning); + assert!(message.contains("compact"), "got message: {message}"); + } + other => panic!("expected compact rejection error, got {other:?}"), + } + } } diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 396bdcea..f5cc55aa 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -2008,6 +2008,86 @@ impl Pod { } } + /// Run an explicit user-requested compaction between turns. + /// + /// The controller only calls this while Idle. Paused turns keep their + /// interrupted Worker state intact and are intentionally rejected before + /// this method is reached. + pub async fn manual_compact(&mut self) -> Result { + if self.manifest.compaction.is_none() { + let message = + "manual compact is unavailable because [compaction] is not configured".to_string(); + self.alert(AlertLevel::Warn, AlertSource::Compactor, message.clone()); + return Ok(ManualCompactResult::Skipped { message }); + } + + if self.history().is_empty() { + let message = "manual compact skipped: no conversation history to compact".to_string(); + self.alert(AlertLevel::Warn, AlertSource::Compactor, message.clone()); + return Ok(ManualCompactResult::Skipped { message }); + } + + self.ensure_interceptor_installed(); + self.cleanup_finished_memory_task(); + self.ensure_segment_head()?; + + let state = self.compact_state.clone(); + if state.as_ref().is_some_and(|s| s.is_disabled()) { + let message = + "manual compact is disabled after repeated compaction failures".to_string(); + self.alert(AlertLevel::Warn, AlertSource::Compactor, message.clone()); + return Ok(ManualCompactResult::Skipped { message }); + } + + let retained = state + .as_ref() + .map(|s| s.retained_tokens()) + .or_else(|| { + self.manifest + .compaction + .as_ref() + .map(|c| c.compact_retained_tokens) + }) + .unwrap_or(manifest::defaults::COMPACT_RETAINED_TOKENS); + let current_tokens = self.total_tokens().tokens; + let cut = self.split_for_retained(retained); + if cut.index == 0 { + let message = format!( + "manual compact skipped: current context is within the retained tail ({current_tokens} <= {retained} tokens)" + ); + self.alert(AlertLevel::Warn, AlertSource::Compactor, message.clone()); + return Ok(ManualCompactResult::Skipped { message }); + } + + self.join_memory_task().await; + self.send_event(Event::CompactStart); + match self.compact(retained).await { + Ok(new_segment_id) => { + info!(new_segment_id = %new_segment_id, "Manual compaction succeeded"); + self.send_event(Event::CompactDone { new_segment_id }); + if let Some(ref state) = state { + state.record_compact_success(); + } + Ok(ManualCompactResult::Compacted { new_segment_id }) + } + Err(e) => { + warn!(error = %e, "Manual compaction failed"); + self.send_event(Event::CompactFailed { + error: e.to_string(), + }); + self.alert( + AlertLevel::Error, + AlertSource::Compactor, + format!("manual compaction failed: {e}"), + ); + if let Some(ref state) = state { + state.record_compact_failure(); + } + Err(e) + } + } + } + /// Persist delta + turn end + outcome after a run/resume. async fn persist_turn( &mut self, @@ -3307,6 +3387,15 @@ pub enum PodRunResult { RolledBack, } +/// Result of a manual compaction request. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ManualCompactResult { + /// The history was compacted into a new segment. + Compacted { new_segment_id: SegmentId }, + /// No compaction was run; the message has already been surfaced as an alert. + Skipped { message: String }, +} + impl From for PodRunResult { fn from(r: WorkerResult) -> Self { match r { diff --git a/crates/pod/tests/compact_events_test.rs b/crates/pod/tests/compact_events_test.rs index 3a917412..5c6406ef 100644 --- a/crates/pod/tests/compact_events_test.rs +++ b/crates/pod/tests/compact_events_test.rs @@ -16,11 +16,11 @@ use llm_worker::Worker; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; use llm_worker::llm_client::types::Item; use llm_worker::llm_client::{ClientError, LlmClient, Request}; -use protocol::Event; +use protocol::{Event, Method, RunResult}; use session_store::{FsStore, LogEntry, PodMetadataStore, Store}; use tokio::sync::broadcast; -use pod::Pod; +use pod::{Pod, PodController}; #[derive(Clone)] struct MockClient { @@ -754,3 +754,53 @@ async fn detached_extract_does_not_fork_session_log() { clone carried its own counter" ); } + +#[tokio::test] +async fn controller_compact_method_emits_start_and_done() { + let client = MockClient::new(vec![ + text_events_with_usage("hi", 1000), + write_summary_tool_use_events("manual-summary", "manual compact summary"), + single_text_events("done"), + ]); + let pod = make_pod_with_manifest(POST_RUN_MANIFEST_TOML, client).await; + let runtime_tmp = tempfile::tempdir().unwrap(); + let (handle, _shutdown) = PodController::spawn(pod, runtime_tmp.path()).await.unwrap(); + let mut rx = handle.subscribe(); + + handle + .send(Method::run_text("seed history")) + .await + .expect("send run"); + loop { + match tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) + .await + .expect("timeout waiting for run end") + .expect("event") + { + Event::RunEnd { + result: RunResult::Finished, + } => break, + _ => {} + } + } + + handle.send(Method::Compact).await.expect("send compact"); + let mut saw_start = false; + loop { + match tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) + .await + .expect("timeout waiting for compact events") + .expect("event") + { + Event::CompactStart => saw_start = true, + Event::CompactDone { .. } => { + break; + } + Event::CompactFailed { error } => panic!("manual compact failed: {error}"), + _ => {} + } + } + + assert!(saw_start, "manual compact should emit CompactStart"); + let _ = handle.send(Method::Shutdown).await; +} diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index c383d495..74c63de1 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -31,6 +31,11 @@ pub enum Method { /// fresh turn via `Run` (orphan `tool_use` items are closed with a /// synthetic tool result before the new user message is appended). Pause, + /// Request an explicit compaction while the Pod is otherwise idle. + /// + /// This is a typed control method: clients must not send `compact` as a + /// `Method::Run` user message. + Compact, Shutdown, /// Request a list of completion candidates from the Pod. /// @@ -732,6 +737,15 @@ mod tests { assert_eq!(serialized, json); } + #[test] + fn method_compact_roundtrip() { + let json = r#"{"method":"compact"}"#; + let method: Method = serde_json::from_str(json).unwrap(); + assert!(matches!(method, Method::Compact)); + let serialized = serde_json::to_string(&method).unwrap(); + assert_eq!(serialized, json); + } + #[test] fn event_text_delta_format() { let event = Event::TextDelta { diff --git a/crates/tui/src/command.rs b/crates/tui/src/command.rs index ced220e9..71dca178 100644 --- a/crates/tui/src/command.rs +++ b/crates/tui/src/command.rs @@ -138,6 +138,15 @@ impl CommandRegistry { can_execute: always_available, executor: noop_command, }); + registry.register(CommandSpec { + name: "compact", + aliases: &[], + usage: "compact", + description: "Request immediate Pod context compaction.", + argument_parser: compact_args, + can_execute: compact_available, + executor: compact_command, + }); registry } @@ -266,6 +275,34 @@ fn help_args(raw: &str) -> Result { } } +fn compact_args(raw: &str) -> Result { + let args = CommandArgs::parse_whitespace(raw); + if args.argv().is_empty() { + Ok(args) + } else { + Err(CommandDiagnostic::new("Invalid arguments. Usage: compact")) + } +} + +fn compact_available(environment: &CommandEnvironment) -> Result<(), CommandDiagnostic> { + if !environment.connected { + return Err(CommandDiagnostic::new( + "Cannot compact: not connected to a Pod.", + )); + } + if environment.running { + return Err(CommandDiagnostic::new( + "Cannot compact while the Pod is running.", + )); + } + if environment.paused { + return Err(CommandDiagnostic::new( + "Cannot compact while the Pod is paused; resume or start a fresh turn first.", + )); + } + Ok(()) +} + fn help_command(invocation: CommandInvocation<'_>) -> CommandExecution { if let Some(name) = invocation.args.argv().first() { let Some(command) = invocation.registry.find(name) else { @@ -301,6 +338,18 @@ fn noop_command(invocation: CommandInvocation<'_>) -> CommandExecution { CommandExecution::notice("noop: no action") } +fn compact_command(invocation: CommandInvocation<'_>) -> CommandExecution { + let _ = invocation.command; + let _ = invocation.environment; + let _ = invocation.args.raw(); + CommandExecution { + method: Some(Method::Compact), + diagnostics: vec![CommandDiagnostic::new("compact requested")], + exit_command_mode: true, + clear_input: true, + } +} + #[cfg(test)] mod tests { use super::*; @@ -337,4 +386,39 @@ mod tests { assert!(!result.exit_command_mode); assert!(result.diagnostics[0].message.contains("Invalid arguments")); } + + #[test] + fn compact_command_returns_compact_method_not_run() { + let registry = CommandRegistry::builtins(); + let result = registry.dispatch("compact", &env()); + assert!(matches!(result.method, Some(Method::Compact))); + assert!(result.exit_command_mode); + assert!(result.clear_input); + assert!(result.diagnostics[0].message.contains("compact requested")); + } + + #[test] + fn compact_invalid_arguments_are_local_diagnostic() { + let registry = CommandRegistry::builtins(); + let result = registry.dispatch("compact now", &env()); + assert!(result.method.is_none()); + assert!(!result.exit_command_mode); + assert!(result.diagnostics[0].message.contains("Invalid arguments")); + } + + #[test] + fn compact_rejects_running_and_paused_locally() { + let registry = CommandRegistry::builtins(); + let mut running = env(); + running.running = true; + let result = registry.dispatch("compact", &running); + assert!(result.method.is_none()); + assert!(result.diagnostics[0].message.contains("running")); + + let mut paused = env(); + paused.paused = true; + let result = registry.dispatch("compact", &paused); + assert!(result.method.is_none()); + assert!(result.diagnostics[0].message.contains("paused")); + } } diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index 2691f7c1..9072295e 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -1234,6 +1234,38 @@ mod tests { })); } + #[test] + fn compact_command_sends_compact_method_without_run() { + let mut app = App::new("agent".to_string()); + app.connected = true; + assert!( + handle_key( + &mut app, + KeyEvent::new(KeyCode::Char(':'), KeyModifiers::NONE) + ) + .is_none() + ); + for c in "compact".chars() { + assert!( + handle_key( + &mut app, + KeyEvent::new(KeyCode::Char(c), KeyModifiers::NONE) + ) + .is_none() + ); + } + + let method = handle_key(&mut app, KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE)); + assert!(matches!(method, Some(protocol::Method::Compact))); + assert!(!app.is_command_mode()); + assert_eq!(input_text(&app), ""); + assert_eq!(app.queued_input_count(), 0); + assert!(app.blocks.iter().any(|block| match block { + crate::block::Block::Alert { message, .. } => message.contains("compact requested"), + _ => false, + })); + } + #[test] fn command_registry_suggestions_are_available() { let mut app = App::new("agent".to_string());