feat: add manual compact command

This commit is contained in:
Keisuke Hirata 2026-05-24 08:59:44 +09:00
parent ebff9a0293
commit 2109733cb7
No known key found for this signature in database
6 changed files with 348 additions and 2 deletions

View File

@ -732,6 +732,31 @@ async fn controller_loop<C, St>(
} }
} }
Method::Compact => match shared_state.get_status() {
PodStatus::Idle => {
if let Err(error) = pod.manual_compact().await {
let _ = event_tx.send(Event::Error {
code: worker_error_code(&error),
message: error.to_string(),
});
}
}
PodStatus::Paused => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::InvalidRequest,
message: "Cannot compact while the Pod is paused; resume or start a fresh turn first"
.into(),
});
}
PodStatus::Running => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::AlreadyRunning,
message: "Pod is already executing a turn; compact can only run while idle"
.into(),
});
}
},
Method::Shutdown => { Method::Shutdown => {
let _ = event_tx.send(Event::Shutdown); let _ = event_tx.send(Event::Shutdown);
break; break;
@ -965,6 +990,13 @@ where
message: "Pod is already executing a turn".into(), message: "Pod is already executing a turn".into(),
}); });
} }
Some(Method::Compact) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::AlreadyRunning,
message: "Pod is already executing a turn; compact can only run while idle"
.into(),
});
}
Some(Method::Notify { message }) => { Some(Method::Notify { message }) => {
// Live echo arrives via `Event::SystemItem` once // Live echo arrives via `Event::SystemItem` once
// the in-flight turn's next `pre_llm_request` // the in-flight turn's next `pre_llm_request`
@ -1320,4 +1352,46 @@ mod tests {
"expected no PodEvent for notification-originated worker error" "expected no PodEvent for notification-originated worker error"
); );
} }
#[tokio::test]
async fn compact_method_is_rejected_while_running() {
let mut env = make_env().await;
let mut events = env.event_tx.subscribe();
env._method_tx
.send(Method::Compact)
.await
.expect("send compact");
let pod_future = async {
tokio::time::sleep(Duration::from_millis(50)).await;
Ok::<_, PodError>(PodRunResult::Finished)
};
let (status, shutdown) = drive_turn(
pod_future,
&mut env.method_rx,
&env.event_tx,
&env.cancel_tx,
&env.shared_state,
&env.notify_buffer,
Some(&env.parent_socket_path),
"child-pod",
&env.spawned_registry,
false,
)
.await;
assert_eq!(status, PodStatus::Idle);
assert!(!shutdown);
let event = tokio::time::timeout(Duration::from_secs(1), events.recv())
.await
.expect("event timeout")
.expect("event");
match event {
Event::Error { code, message } => {
assert_eq!(code, ErrorCode::AlreadyRunning);
assert!(message.contains("compact"), "got message: {message}");
}
other => panic!("expected compact rejection error, got {other:?}"),
}
}
} }

View File

@ -2008,6 +2008,86 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
} }
} }
/// Run an explicit user-requested compaction between turns.
///
/// The controller only calls this while Idle. Paused turns keep their
/// interrupted Worker state intact and are intentionally rejected before
/// this method is reached.
pub async fn manual_compact(&mut self) -> Result<ManualCompactResult, PodError> {
if self.manifest.compaction.is_none() {
let message =
"manual compact is unavailable because [compaction] is not configured".to_string();
self.alert(AlertLevel::Warn, AlertSource::Compactor, message.clone());
return Ok(ManualCompactResult::Skipped { message });
}
if self.history().is_empty() {
let message = "manual compact skipped: no conversation history to compact".to_string();
self.alert(AlertLevel::Warn, AlertSource::Compactor, message.clone());
return Ok(ManualCompactResult::Skipped { message });
}
self.ensure_interceptor_installed();
self.cleanup_finished_memory_task();
self.ensure_segment_head()?;
let state = self.compact_state.clone();
if state.as_ref().is_some_and(|s| s.is_disabled()) {
let message =
"manual compact is disabled after repeated compaction failures".to_string();
self.alert(AlertLevel::Warn, AlertSource::Compactor, message.clone());
return Ok(ManualCompactResult::Skipped { message });
}
let retained = state
.as_ref()
.map(|s| s.retained_tokens())
.or_else(|| {
self.manifest
.compaction
.as_ref()
.map(|c| c.compact_retained_tokens)
})
.unwrap_or(manifest::defaults::COMPACT_RETAINED_TOKENS);
let current_tokens = self.total_tokens().tokens;
let cut = self.split_for_retained(retained);
if cut.index == 0 {
let message = format!(
"manual compact skipped: current context is within the retained tail ({current_tokens} <= {retained} tokens)"
);
self.alert(AlertLevel::Warn, AlertSource::Compactor, message.clone());
return Ok(ManualCompactResult::Skipped { message });
}
self.join_memory_task().await;
self.send_event(Event::CompactStart);
match self.compact(retained).await {
Ok(new_segment_id) => {
info!(new_segment_id = %new_segment_id, "Manual compaction succeeded");
self.send_event(Event::CompactDone { new_segment_id });
if let Some(ref state) = state {
state.record_compact_success();
}
Ok(ManualCompactResult::Compacted { new_segment_id })
}
Err(e) => {
warn!(error = %e, "Manual compaction failed");
self.send_event(Event::CompactFailed {
error: e.to_string(),
});
self.alert(
AlertLevel::Error,
AlertSource::Compactor,
format!("manual compaction failed: {e}"),
);
if let Some(ref state) = state {
state.record_compact_failure();
}
Err(e)
}
}
}
/// Persist delta + turn end + outcome after a run/resume. /// Persist delta + turn end + outcome after a run/resume.
async fn persist_turn( async fn persist_turn(
&mut self, &mut self,
@ -3307,6 +3387,15 @@ pub enum PodRunResult {
RolledBack, RolledBack,
} }
/// Result of a manual compaction request.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ManualCompactResult {
/// The history was compacted into a new segment.
Compacted { new_segment_id: SegmentId },
/// No compaction was run; the message has already been surfaced as an alert.
Skipped { message: String },
}
impl From<WorkerResult> for PodRunResult { impl From<WorkerResult> for PodRunResult {
fn from(r: WorkerResult) -> Self { fn from(r: WorkerResult) -> Self {
match r { match r {

View File

@ -16,11 +16,11 @@ use llm_worker::Worker;
use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent};
use llm_worker::llm_client::types::Item; use llm_worker::llm_client::types::Item;
use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::llm_client::{ClientError, LlmClient, Request};
use protocol::Event; use protocol::{Event, Method, RunResult};
use session_store::{FsStore, LogEntry, PodMetadataStore, Store}; use session_store::{FsStore, LogEntry, PodMetadataStore, Store};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use pod::Pod; use pod::{Pod, PodController};
#[derive(Clone)] #[derive(Clone)]
struct MockClient { struct MockClient {
@ -754,3 +754,56 @@ async fn detached_extract_does_not_fork_session_log() {
clone carried its own counter" clone carried its own counter"
); );
} }
#[tokio::test]
async fn controller_compact_method_emits_start_and_done() {
let client = MockClient::new(vec![
text_events_with_usage("hi", 1000),
write_summary_tool_use_events("manual-summary", "manual compact summary"),
single_text_events("done"),
]);
let pod = make_pod_with_manifest(POST_RUN_MANIFEST_TOML, client).await;
let runtime_tmp = tempfile::tempdir().unwrap();
let (handle, _shutdown) = PodController::spawn(pod, runtime_tmp.path()).await.unwrap();
let mut rx = handle.subscribe();
handle
.send(Method::run_text("seed history"))
.await
.expect("send run");
loop {
match tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("timeout waiting for run end")
.expect("event")
{
Event::RunEnd {
result: RunResult::Finished,
} => break,
_ => {}
}
}
handle.send(Method::Compact).await.expect("send compact");
let mut saw_start = false;
let mut saw_done = false;
loop {
match tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("timeout waiting for compact events")
.expect("event")
{
Event::CompactStart => saw_start = true,
Event::CompactDone { .. } => {
saw_done = true;
break;
}
Event::CompactFailed { error } => panic!("manual compact failed: {error}"),
_ => {}
}
}
assert!(saw_start, "manual compact should emit CompactStart");
assert!(saw_done, "manual compact should emit CompactDone");
let _ = handle.send(Method::Shutdown).await;
}

View File

@ -31,6 +31,11 @@ pub enum Method {
/// fresh turn via `Run` (orphan `tool_use` items are closed with a /// fresh turn via `Run` (orphan `tool_use` items are closed with a
/// synthetic tool result before the new user message is appended). /// synthetic tool result before the new user message is appended).
Pause, Pause,
/// Request an explicit compaction while the Pod is otherwise idle.
///
/// This is a typed control method: clients must not send `compact` as a
/// `Method::Run` user message.
Compact,
Shutdown, Shutdown,
/// Request a list of completion candidates from the Pod. /// Request a list of completion candidates from the Pod.
/// ///
@ -732,6 +737,15 @@ mod tests {
assert_eq!(serialized, json); assert_eq!(serialized, json);
} }
#[test]
fn method_compact_roundtrip() {
let json = r#"{"method":"compact"}"#;
let method: Method = serde_json::from_str(json).unwrap();
assert!(matches!(method, Method::Compact));
let serialized = serde_json::to_string(&method).unwrap();
assert_eq!(serialized, json);
}
#[test] #[test]
fn event_text_delta_format() { fn event_text_delta_format() {
let event = Event::TextDelta { let event = Event::TextDelta {

View File

@ -138,6 +138,15 @@ impl CommandRegistry {
can_execute: always_available, can_execute: always_available,
executor: noop_command, executor: noop_command,
}); });
registry.register(CommandSpec {
name: "compact",
aliases: &[],
usage: "compact",
description: "Request immediate Pod context compaction.",
argument_parser: compact_args,
can_execute: compact_available,
executor: compact_command,
});
registry registry
} }
@ -266,6 +275,34 @@ fn help_args(raw: &str) -> Result<CommandArgs, CommandDiagnostic> {
} }
} }
fn compact_args(raw: &str) -> Result<CommandArgs, CommandDiagnostic> {
let args = CommandArgs::parse_whitespace(raw);
if args.argv().is_empty() {
Ok(args)
} else {
Err(CommandDiagnostic::new("Invalid arguments. Usage: compact"))
}
}
fn compact_available(environment: &CommandEnvironment) -> Result<(), CommandDiagnostic> {
if !environment.connected {
return Err(CommandDiagnostic::new(
"Cannot compact: not connected to a Pod.",
));
}
if environment.running {
return Err(CommandDiagnostic::new(
"Cannot compact while the Pod is running.",
));
}
if environment.paused {
return Err(CommandDiagnostic::new(
"Cannot compact while the Pod is paused; resume or start a fresh turn first.",
));
}
Ok(())
}
fn help_command(invocation: CommandInvocation<'_>) -> CommandExecution { fn help_command(invocation: CommandInvocation<'_>) -> CommandExecution {
if let Some(name) = invocation.args.argv().first() { if let Some(name) = invocation.args.argv().first() {
let Some(command) = invocation.registry.find(name) else { let Some(command) = invocation.registry.find(name) else {
@ -301,6 +338,18 @@ fn noop_command(invocation: CommandInvocation<'_>) -> CommandExecution {
CommandExecution::notice("noop: no action") CommandExecution::notice("noop: no action")
} }
fn compact_command(invocation: CommandInvocation<'_>) -> CommandExecution {
let _ = invocation.command;
let _ = invocation.environment;
let _ = invocation.args.raw();
CommandExecution {
method: Some(Method::Compact),
diagnostics: vec![CommandDiagnostic::new("compact requested")],
exit_command_mode: true,
clear_input: true,
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -337,4 +386,39 @@ mod tests {
assert!(!result.exit_command_mode); assert!(!result.exit_command_mode);
assert!(result.diagnostics[0].message.contains("Invalid arguments")); assert!(result.diagnostics[0].message.contains("Invalid arguments"));
} }
#[test]
fn compact_command_returns_compact_method_not_run() {
let registry = CommandRegistry::builtins();
let result = registry.dispatch("compact", &env());
assert!(matches!(result.method, Some(Method::Compact)));
assert!(result.exit_command_mode);
assert!(result.clear_input);
assert!(result.diagnostics[0].message.contains("compact requested"));
}
#[test]
fn compact_invalid_arguments_are_local_diagnostic() {
let registry = CommandRegistry::builtins();
let result = registry.dispatch("compact now", &env());
assert!(result.method.is_none());
assert!(!result.exit_command_mode);
assert!(result.diagnostics[0].message.contains("Invalid arguments"));
}
#[test]
fn compact_rejects_running_and_paused_locally() {
let registry = CommandRegistry::builtins();
let mut running = env();
running.running = true;
let result = registry.dispatch("compact", &running);
assert!(result.method.is_none());
assert!(result.diagnostics[0].message.contains("running"));
let mut paused = env();
paused.paused = true;
let result = registry.dispatch("compact", &paused);
assert!(result.method.is_none());
assert!(result.diagnostics[0].message.contains("paused"));
}
} }

View File

@ -1234,6 +1234,38 @@ mod tests {
})); }));
} }
#[test]
fn compact_command_sends_compact_method_without_run() {
let mut app = App::new("agent".to_string());
app.connected = true;
assert!(
handle_key(
&mut app,
KeyEvent::new(KeyCode::Char(':'), KeyModifiers::NONE)
)
.is_none()
);
for c in "compact".chars() {
assert!(
handle_key(
&mut app,
KeyEvent::new(KeyCode::Char(c), KeyModifiers::NONE)
)
.is_none()
);
}
let method = handle_key(&mut app, KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
assert!(matches!(method, Some(protocol::Method::Compact)));
assert!(!app.is_command_mode());
assert_eq!(input_text(&app), "");
assert_eq!(app.queued_input_count(), 0);
assert!(app.blocks.iter().any(|block| match block {
crate::block::Block::Alert { message, .. } => message.contains("compact requested"),
_ => false,
}));
}
#[test] #[test]
fn command_registry_suggestions_are_available() { fn command_registry_suggestions_are_available() {
let mut app = App::new("agent".to_string()); let mut app = App::new("agent".to_string());