813 lines
26 KiB
Rust
813 lines
26 KiB
Rust
//! Compact lifecycle `Event` broadcasting.
|
|
//!
|
|
//! Covers three paths:
|
|
//! - `try_pre_run_compact` success → `CompactStart + CompactDone`
|
|
//! - `try_pre_run_compact` failure → `CompactStart + CompactFailed`
|
|
//! - mid-turn `do_compact_and_resume` success → `CompactStart + CompactDone`
|
|
//! (driven by `compact_request_threshold` → `PreRequestAction::Yield`)
|
|
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
|
|
use async_trait::async_trait;
|
|
use futures::Stream;
|
|
use llm_worker::Worker;
|
|
use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent};
|
|
use llm_worker::llm_client::types::Item;
|
|
use llm_worker::llm_client::{ClientError, LlmClient, Request};
|
|
use pod_store::{CombinedStore, FsPodStore, PodMetadataStore};
|
|
use protocol::{Event, Method, RunResult};
|
|
use session_store::{FsStore, LogEntry, Store};
|
|
use tokio::sync::broadcast;
|
|
|
|
use pod::{Pod, PodController};
|
|
|
|
type TestStore = CombinedStore<FsStore, FsPodStore>;
|
|
|
|
#[derive(Clone)]
|
|
struct MockClient {
|
|
responses: Arc<Vec<Vec<LlmEvent>>>,
|
|
call_count: Arc<AtomicUsize>,
|
|
}
|
|
|
|
impl MockClient {
|
|
fn new(responses: Vec<Vec<LlmEvent>>) -> Self {
|
|
Self {
|
|
responses: Arc::new(responses),
|
|
call_count: Arc::new(AtomicUsize::new(0)),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl LlmClient for MockClient {
|
|
fn clone_boxed(&self) -> Box<dyn LlmClient> {
|
|
Box::new(self.clone())
|
|
}
|
|
|
|
async fn stream(
|
|
&self,
|
|
_request: Request,
|
|
) -> Result<Pin<Box<dyn Stream<Item = Result<LlmEvent, ClientError>> + Send>>, ClientError>
|
|
{
|
|
let count = self.call_count.fetch_add(1, Ordering::SeqCst);
|
|
if count >= self.responses.len() {
|
|
return Err(ClientError::Config("mock client exhausted".into()));
|
|
}
|
|
let events = self.responses[count].clone();
|
|
let stream = futures::stream::iter(events.into_iter().map(Ok));
|
|
Ok(Box::pin(stream))
|
|
}
|
|
}
|
|
|
|
fn single_text_events(text: &str) -> Vec<LlmEvent> {
|
|
vec![
|
|
LlmEvent::text_block_start(0),
|
|
LlmEvent::text_delta(0, text),
|
|
LlmEvent::text_block_stop(0, None),
|
|
LlmEvent::Status(StatusEvent {
|
|
status: ResponseStatus::Completed,
|
|
}),
|
|
]
|
|
}
|
|
|
|
/// `single_text_events` + a UsageEvent so the Pod's `usage_history`
|
|
/// picks up a measurement, which is how `pre_llm_request` decides
|
|
/// whether to yield mid-turn.
|
|
fn text_events_with_usage(text: &str, input_tokens: u64) -> Vec<LlmEvent> {
|
|
vec![
|
|
LlmEvent::text_block_start(0),
|
|
LlmEvent::text_delta(0, text),
|
|
LlmEvent::text_block_stop(0, None),
|
|
LlmEvent::usage(input_tokens, 1),
|
|
LlmEvent::Status(StatusEvent {
|
|
status: ResponseStatus::Completed,
|
|
}),
|
|
]
|
|
}
|
|
|
|
fn write_summary_tool_use_events(call_id: &str, text: &str) -> Vec<LlmEvent> {
|
|
let input = serde_json::json!({ "text": text }).to_string();
|
|
vec![
|
|
LlmEvent::tool_use_start(0, call_id, "write_summary"),
|
|
LlmEvent::tool_input_delta(0, input),
|
|
LlmEvent::tool_use_stop(0),
|
|
LlmEvent::Status(StatusEvent {
|
|
status: ResponseStatus::Completed,
|
|
}),
|
|
]
|
|
}
|
|
|
|
// A low compact_threshold guarantees `try_pre_run_compact` will fire
|
|
// the first time we check after a run.
|
|
const POST_RUN_MANIFEST_TOML: &str = r#"
|
|
[pod]
|
|
name = "test-pod"
|
|
pwd = "./"
|
|
|
|
[model]
|
|
scheme = "anthropic"
|
|
model_id = "test-model"
|
|
|
|
[worker]
|
|
max_tokens = 100
|
|
|
|
[compaction]
|
|
compact_threshold = 1
|
|
compact_retained_tokens = 0
|
|
|
|
[[scope.allow]]
|
|
target = "./"
|
|
permission = "write"
|
|
"#;
|
|
|
|
// `compact_request_threshold` drives the PodInterceptor's mid-turn yield
|
|
// path. `compact_threshold` is left unset so the post-run check stays inert.
|
|
const MID_TURN_MANIFEST_TOML: &str = r#"
|
|
[pod]
|
|
name = "test-pod"
|
|
pwd = "./"
|
|
|
|
[model]
|
|
scheme = "anthropic"
|
|
model_id = "test-model"
|
|
|
|
[worker]
|
|
max_tokens = 100
|
|
|
|
[compaction]
|
|
compact_request_threshold = 100
|
|
compact_retained_tokens = 0
|
|
|
|
[[scope.allow]]
|
|
target = "./"
|
|
permission = "write"
|
|
"#;
|
|
|
|
async fn make_pod_with_manifest(
|
|
manifest_toml: &str,
|
|
client: MockClient,
|
|
) -> Pod<MockClient, TestStore> {
|
|
let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap();
|
|
|
|
let store_tmp = tempfile::tempdir().unwrap();
|
|
let store = CombinedStore::new(
|
|
FsStore::new(store_tmp.path()).unwrap(),
|
|
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
|
|
);
|
|
std::mem::forget(store_tmp);
|
|
|
|
let pwd_tmp = tempfile::tempdir().unwrap();
|
|
let pwd = pwd_tmp.path().to_path_buf();
|
|
let scope = pod::Scope::writable(&pwd).unwrap();
|
|
std::mem::forget(pwd_tmp);
|
|
|
|
let worker = Worker::new(client);
|
|
let mut pod = Pod::new(manifest, worker, store, pwd, scope).await.unwrap();
|
|
pod.enable_pod_metadata_write_through().unwrap();
|
|
pod
|
|
}
|
|
|
|
async fn make_pod(client: MockClient) -> Pod<MockClient, TestStore> {
|
|
make_pod_with_manifest(POST_RUN_MANIFEST_TOML, client).await
|
|
}
|
|
|
|
/// Drain whatever events are already queued on `rx`. Non-blocking.
|
|
fn drain(rx: &mut broadcast::Receiver<Event>) -> Vec<Event> {
|
|
let mut out = Vec::new();
|
|
loop {
|
|
match rx.try_recv() {
|
|
Ok(ev) => out.push(ev),
|
|
Err(_) => break,
|
|
}
|
|
}
|
|
out
|
|
}
|
|
|
|
/// Collect every system-message text that the post-compaction
|
|
/// `SegmentStart.history` carries, by reading the sink mirror directly.
|
|
fn system_texts_in_sink_session_start(
|
|
pod: &pod::Pod<
|
|
impl llm_worker::llm_client::client::LlmClient + Clone + 'static,
|
|
impl session_store::Store + Clone + 'static,
|
|
>,
|
|
) -> Vec<String> {
|
|
let (entries, _rx) = pod.sink().subscribe_with_snapshot();
|
|
for entry in entries.into_iter().rev() {
|
|
if let session_store::LogEntry::SegmentStart { history, .. } = entry {
|
|
return history
|
|
.into_iter()
|
|
.filter_map(|logged| {
|
|
let item: Item = logged.into();
|
|
match item {
|
|
Item::Message {
|
|
role: llm_worker::Role::System,
|
|
content,
|
|
..
|
|
} => Some(
|
|
content
|
|
.iter()
|
|
.map(|p| p.as_text().to_owned())
|
|
.collect::<Vec<_>>()
|
|
.join(""),
|
|
),
|
|
_ => None,
|
|
}
|
|
})
|
|
.collect();
|
|
}
|
|
}
|
|
Vec::new()
|
|
}
|
|
|
|
/// Pod metadata starts with a reserved Session and no Segment, then becomes
|
|
/// active once the first SegmentStart is materialized by `run`.
|
|
#[tokio::test]
|
|
async fn pod_metadata_moves_from_pending_to_active_on_first_run() {
|
|
let client = MockClient::new(vec![single_text_events("hi")]);
|
|
let mut pod = make_pod(client).await;
|
|
let store = pod.store().clone();
|
|
let session_id = pod.session_id();
|
|
let initial_segment_id = pod.segment_id();
|
|
|
|
let pending = store
|
|
.read_by_name("test-pod")
|
|
.unwrap()
|
|
.expect("metadata should be initialized at Pod construction");
|
|
assert_eq!(pending.pod_name, "test-pod");
|
|
let pending_active = pending.active.expect("active session pointer missing");
|
|
assert_eq!(pending_active.session_id, session_id);
|
|
assert_eq!(pending_active.segment_id, None);
|
|
|
|
pod.run_text("first").await.unwrap();
|
|
|
|
let resolved = store
|
|
.read_by_name("test-pod")
|
|
.unwrap()
|
|
.expect("metadata should still exist after first run");
|
|
let active = resolved.active.expect("active session pointer missing");
|
|
assert_eq!(active.session_id, session_id);
|
|
assert_eq!(active.segment_id, Some(initial_segment_id));
|
|
}
|
|
|
|
/// Live auto-fork: when another writer extends the segment behind the
|
|
/// Pod's back, the next run's `ensure_segment_head` detects the
|
|
/// entry-count drift and branches into a fresh segment **within the same
|
|
/// Session**. The source segment is left immutable (no terminal marker
|
|
/// written back); the new segment records its parentage forward via
|
|
/// `SegmentStart.forked_from`.
|
|
#[tokio::test]
|
|
async fn concurrent_writer_drift_auto_forks_with_forked_from() {
|
|
// No compaction: keep run → run deterministic so each run consumes
|
|
// exactly one mock response and ensure_segment_head is the only fork
|
|
// trigger.
|
|
const NO_COMPACT_MANIFEST_TOML: &str = r#"
|
|
[pod]
|
|
name = "test-pod"
|
|
pwd = "./"
|
|
|
|
[model]
|
|
scheme = "anthropic"
|
|
model_id = "test-model"
|
|
|
|
[worker]
|
|
max_tokens = 100
|
|
|
|
[[scope.allow]]
|
|
target = "./"
|
|
permission = "write"
|
|
"#;
|
|
let client = MockClient::new(vec![
|
|
single_text_events("first"),
|
|
single_text_events("second"),
|
|
]);
|
|
let mut pod = make_pod_with_manifest(NO_COMPACT_MANIFEST_TOML, client).await;
|
|
|
|
pod.run_text("first").await.unwrap();
|
|
|
|
let store = pod.store().clone();
|
|
let session_id = pod.session_id();
|
|
let source_segment_id = pod.segment_id();
|
|
let source_len_before = store.read_all(session_id, source_segment_id).unwrap().len();
|
|
|
|
// Simulate a foreign writer appending to the same segment. This bumps
|
|
// the on-disk entry count past the Pod's own append tally without
|
|
// updating the Pod's `entries_written`.
|
|
store
|
|
.append(
|
|
session_id,
|
|
source_segment_id,
|
|
&LogEntry::UserInput {
|
|
ts: 9999,
|
|
segments: vec![protocol::Segment::text("interloper")],
|
|
},
|
|
)
|
|
.unwrap();
|
|
|
|
// Next run triggers ensure_segment_head, which sees the drift.
|
|
pod.run_text("second").await.unwrap();
|
|
|
|
// The Pod moved to a new segment in the same Session.
|
|
let new_segment_id = pod.segment_id();
|
|
assert_ne!(new_segment_id, source_segment_id);
|
|
assert_eq!(pod.session_id(), session_id, "auto-fork stays in-Session");
|
|
let metadata = store
|
|
.read_by_name("test-pod")
|
|
.unwrap()
|
|
.expect("metadata should exist after auto-fork");
|
|
let active = metadata.active.expect("active session pointer missing");
|
|
assert_eq!(active.session_id, session_id);
|
|
assert_eq!(active.segment_id, Some(new_segment_id));
|
|
|
|
// New segment records forked_from pointing at the source.
|
|
let new_entries = store.read_all(session_id, new_segment_id).unwrap();
|
|
match &new_entries[0] {
|
|
LogEntry::SegmentStart {
|
|
session_id: seg_session,
|
|
forked_from: Some(origin),
|
|
..
|
|
} => {
|
|
assert_eq!(*seg_session, session_id);
|
|
assert_eq!(origin.segment_id, source_segment_id);
|
|
}
|
|
other => panic!("expected SegmentStart with forked_from, got {other:?}"),
|
|
}
|
|
|
|
// Source segment is unchanged except for the foreign append — the
|
|
// auto-fork wrote no terminal marker back into it.
|
|
let source_after = store.read_all(session_id, source_segment_id).unwrap();
|
|
assert_eq!(source_after.len(), source_len_before + 1);
|
|
assert!(matches!(
|
|
source_after.last(),
|
|
Some(LogEntry::UserInput { .. })
|
|
));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn compact_emits_session_start_carrying_summary_and_task_snapshot() {
|
|
let client = MockClient::new(vec![
|
|
single_text_events("hi"),
|
|
write_summary_tool_use_events("call-1", "summary"),
|
|
single_text_events("done"),
|
|
]);
|
|
let mut pod = make_pod(client).await;
|
|
|
|
let (tx, _rx_keep) = broadcast::channel::<Event>(64);
|
|
pod.attach_event_tx(tx);
|
|
|
|
pod.run_text("first").await.unwrap();
|
|
let session_id = pod.session_id();
|
|
pod.compact(10_000).await.unwrap();
|
|
let compacted_segment_id = pod.segment_id();
|
|
let metadata = pod
|
|
.store()
|
|
.read_by_name("test-pod")
|
|
.unwrap()
|
|
.expect("metadata should exist after compaction");
|
|
let active = metadata.active.expect("active session pointer missing");
|
|
assert_eq!(active.session_id, session_id);
|
|
assert_eq!(active.segment_id, Some(compacted_segment_id));
|
|
|
|
let system_texts = system_texts_in_sink_session_start(&pod);
|
|
// The post-compaction `SegmentStart.history` carries the new system
|
|
// messages introduced by the compactor. Clients re-seed their view
|
|
// from this entry alone, so it is the load-bearing payload.
|
|
assert!(
|
|
system_texts
|
|
.iter()
|
|
.any(|text| text.starts_with("[Compacted context summary]")),
|
|
"summary system message missing from {system_texts:?}"
|
|
);
|
|
assert!(
|
|
system_texts
|
|
.iter()
|
|
.any(|text| text.starts_with("[Session TaskStore snapshot]")),
|
|
"task snapshot system message missing from {system_texts:?}"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn pre_run_compact_success_broadcasts_start_and_done() {
|
|
// Responses: (1) first run returns short text, (2) compact worker
|
|
// emits write_summary then closes (two LLM calls inside the compact
|
|
// worker: one for write_summary, one that the compact loop consumes
|
|
// as the final "I'm done" close response).
|
|
let client = MockClient::new(vec![
|
|
single_text_events("hi"),
|
|
write_summary_tool_use_events("call-1", "summary"),
|
|
single_text_events("done"),
|
|
]);
|
|
let mut pod = make_pod(client).await;
|
|
|
|
let (tx, mut rx) = broadcast::channel::<Event>(64);
|
|
pod.attach_event_tx(tx);
|
|
|
|
pod.run_text("first").await.unwrap();
|
|
// Drain run events so only compact events remain in `rx`.
|
|
let _ = drain(&mut rx);
|
|
|
|
pod.try_pre_run_compact().await;
|
|
|
|
let events = drain(&mut rx);
|
|
let kinds: Vec<&str> = events
|
|
.iter()
|
|
.map(|e| match e {
|
|
Event::CompactStart => "start",
|
|
Event::CompactDone { .. } => "done",
|
|
Event::CompactFailed { .. } => "failed",
|
|
_ => "other",
|
|
})
|
|
.collect();
|
|
assert!(
|
|
kinds.contains(&"start") && kinds.contains(&"done"),
|
|
"expected CompactStart + CompactDone in {kinds:?}"
|
|
);
|
|
assert!(
|
|
!kinds.contains(&"failed"),
|
|
"unexpected CompactFailed in {kinds:?}"
|
|
);
|
|
|
|
// CompactDone carries the new session id.
|
|
let new_id_in_event = events.iter().find_map(|e| match e {
|
|
Event::CompactDone { new_segment_id } => Some(*new_segment_id),
|
|
_ => None,
|
|
});
|
|
assert!(new_id_in_event.is_some(), "CompactDone missing");
|
|
assert_eq!(new_id_in_event.unwrap(), pod.segment_id());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn mid_turn_compact_success_broadcasts_start_and_done() {
|
|
// Path: `do_compact_and_resume` via PreRequestAction::Yield.
|
|
//
|
|
// Sequence of LLM calls the mock will serve:
|
|
// [0] first run completes with a UsageEvent(1000 > threshold=100) so
|
|
// the next run's pre_llm_request will yield.
|
|
// [1] compact worker emits `write_summary` tool call.
|
|
// [2] compact worker closes (its final "done" response).
|
|
// [3] resume() after compact makes one more LLM call.
|
|
let client = MockClient::new(vec![
|
|
text_events_with_usage("a", 1000),
|
|
write_summary_tool_use_events("call-1", "summary"),
|
|
single_text_events("done"),
|
|
single_text_events("b"),
|
|
]);
|
|
let mut pod = make_pod_with_manifest(MID_TURN_MANIFEST_TOML, client).await;
|
|
|
|
let (tx, mut rx) = broadcast::channel::<Event>(64);
|
|
pod.attach_event_tx(tx);
|
|
|
|
// First run populates usage_history above the request threshold.
|
|
pod.run_text("first").await.unwrap();
|
|
let _ = drain(&mut rx);
|
|
|
|
// Second run: pre_llm_request yields immediately, Worker returns
|
|
// Yielded, handle_worker_result routes into do_compact_and_resume.
|
|
pod.run_text("second").await.unwrap();
|
|
|
|
let events = drain(&mut rx);
|
|
let kinds: Vec<&str> = events
|
|
.iter()
|
|
.map(|e| match e {
|
|
Event::CompactStart => "start",
|
|
Event::CompactDone { .. } => "done",
|
|
Event::CompactFailed { .. } => "failed",
|
|
_ => "other",
|
|
})
|
|
.collect();
|
|
assert!(
|
|
kinds.contains(&"start") && kinds.contains(&"done"),
|
|
"expected CompactStart + CompactDone in {kinds:?}"
|
|
);
|
|
assert!(
|
|
!kinds.contains(&"failed"),
|
|
"unexpected CompactFailed in {kinds:?}"
|
|
);
|
|
|
|
let new_id_in_event = events.iter().find_map(|e| match e {
|
|
Event::CompactDone { new_segment_id } => Some(*new_segment_id),
|
|
_ => None,
|
|
});
|
|
assert_eq!(new_id_in_event, Some(pod.segment_id()));
|
|
}
|
|
|
|
/// Regression: `Pod::compact()` must reset the in-memory
|
|
/// `extract_pointer` so extract keeps firing on the new compacted
|
|
/// session.
|
|
///
|
|
/// Without the reset, the pointer's `processed_through_history_len`
|
|
/// holds the old (typically large) item count, while the new compacted
|
|
/// session starts with a much shorter history (`[summary, ...]`).
|
|
/// `cumulative_input_tokens_since` would then filter every new
|
|
/// usage record out (their `history_len` is below the stale pointer)
|
|
/// and extract would never re-fire for the rest of the process.
|
|
const EXTRACT_PLUS_COMPACT_MANIFEST: &str = r#"
|
|
[pod]
|
|
name = "test-pod"
|
|
pwd = "./"
|
|
|
|
[model]
|
|
scheme = "anthropic"
|
|
model_id = "test-model"
|
|
|
|
[worker]
|
|
max_tokens = 100
|
|
|
|
[memory]
|
|
extract_threshold = 1
|
|
|
|
[compaction]
|
|
compact_threshold = 1
|
|
compact_retained_tokens = 0
|
|
|
|
[[scope.allow]]
|
|
target = "./"
|
|
permission = "write"
|
|
"#;
|
|
|
|
fn write_extracted_tool_use_events(call_id: &str) -> Vec<LlmEvent> {
|
|
let input = serde_json::json!({
|
|
"decisions": [],
|
|
"discussions": [],
|
|
"attempts": [],
|
|
"requests": []
|
|
})
|
|
.to_string();
|
|
vec![
|
|
LlmEvent::tool_use_start(0, call_id, "write_extracted"),
|
|
LlmEvent::tool_input_delta(0, input),
|
|
LlmEvent::tool_use_stop(0),
|
|
LlmEvent::Status(StatusEvent {
|
|
status: ResponseStatus::Completed,
|
|
}),
|
|
]
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn compact_resets_extract_pointer_so_extract_can_fire_again() {
|
|
// Mock LLM responses, in call order:
|
|
// [0] first run with usage(1000) so extract threshold (=1) fires.
|
|
// [1] extract worker invokes write_extracted with empty payload.
|
|
// [2] extract worker closes after the tool result.
|
|
// [3] compact worker invokes write_summary.
|
|
// [4] compact worker closes after the tool result.
|
|
let client = MockClient::new(vec![
|
|
text_events_with_usage("hi", 1000),
|
|
write_extracted_tool_use_events("ec1"),
|
|
single_text_events("done"),
|
|
write_summary_tool_use_events("sc1", "summary"),
|
|
single_text_events("done"),
|
|
]);
|
|
let mut pod = make_pod_with_manifest(EXTRACT_PLUS_COMPACT_MANIFEST, client).await;
|
|
|
|
pod.run_text("first").await.unwrap();
|
|
|
|
// extract fires; pointer becomes Some.
|
|
pod.try_post_run_extract().await.unwrap();
|
|
assert!(
|
|
pod.extract_pointer().is_some(),
|
|
"extract_pointer should be Some after a successful extract"
|
|
);
|
|
|
|
// Compact runs. Without the fix the in-memory pointer would still
|
|
// reference the old session's history_len.
|
|
pod.try_pre_run_compact().await;
|
|
assert!(
|
|
pod.extract_pointer().is_none(),
|
|
"extract_pointer must be reset to None after compact (matches cold-restore on the new session)"
|
|
);
|
|
}
|
|
|
|
/// `extract_threshold = 0` is treated as "disabled" — without this, a
|
|
/// raw `>=` comparison against `tokens_since` would fire extract on
|
|
/// every post-run regardless of activity. Mirrors the consolidation
|
|
/// zero-threshold convention so users have a single way to opt out
|
|
/// without removing the `[memory]` section.
|
|
const EXTRACT_THRESHOLD_ZERO_MANIFEST: &str = r#"
|
|
[pod]
|
|
name = "test-pod"
|
|
pwd = "./"
|
|
|
|
[model]
|
|
scheme = "anthropic"
|
|
model_id = "test-model"
|
|
|
|
[worker]
|
|
max_tokens = 100
|
|
|
|
[memory]
|
|
extract_threshold = 0
|
|
|
|
[[scope.allow]]
|
|
target = "./"
|
|
permission = "write"
|
|
"#;
|
|
|
|
#[tokio::test]
|
|
async fn extract_threshold_zero_is_disabled() {
|
|
// Mock provides exactly one response — the first run. If extract
|
|
// were treated as "fire on any change" because of `tokens_since >= 0`,
|
|
// it would call into the extract worker and exhaust the mock.
|
|
let client = MockClient::new(vec![text_events_with_usage("hi", 1000)]);
|
|
let mut pod = make_pod_with_manifest(EXTRACT_THRESHOLD_ZERO_MANIFEST, client).await;
|
|
|
|
pod.run_text("first").await.unwrap();
|
|
pod.try_post_run_extract()
|
|
.await
|
|
.expect("extract_threshold=0 must skip silently, not fail");
|
|
assert!(
|
|
pod.extract_pointer().is_none(),
|
|
"no extract should have run — pointer must remain None"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn pre_run_compact_failure_broadcasts_start_and_failed() {
|
|
// Only the first run has a response. Compaction will run the
|
|
// compact worker which immediately exhausts the mock → failure.
|
|
let client = MockClient::new(vec![single_text_events("hi")]);
|
|
let mut pod = make_pod(client).await;
|
|
|
|
let (tx, mut rx) = broadcast::channel::<Event>(64);
|
|
pod.attach_event_tx(tx);
|
|
|
|
pod.run_text("first").await.unwrap();
|
|
let _ = drain(&mut rx);
|
|
|
|
// Best-effort: returns Ok(()) even on failure, but emits CompactFailed.
|
|
pod.try_pre_run_compact().await;
|
|
|
|
let events = drain(&mut rx);
|
|
let kinds: Vec<&str> = events
|
|
.iter()
|
|
.map(|e| match e {
|
|
Event::CompactStart => "start",
|
|
Event::CompactDone { .. } => "done",
|
|
Event::CompactFailed { .. } => "failed",
|
|
_ => "other",
|
|
})
|
|
.collect();
|
|
assert!(
|
|
kinds.contains(&"start") && kinds.contains(&"failed"),
|
|
"expected CompactStart + CompactFailed in {kinds:?}"
|
|
);
|
|
assert!(
|
|
!kinds.contains(&"done"),
|
|
"unexpected CompactDone in {kinds:?}"
|
|
);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Detached post-run memory jobs (`spawn_post_run_memory_jobs` /
|
|
// `wait_for_memory_jobs`). Covers the detach round-trip and the structural
|
|
// invariant that the cloned memory-task Pod shares `SegmentState` with the
|
|
// source Pod, so that `save_extension` from the background extract does not
|
|
// leave the next turn's `save_user_input` looking at a stale session pointer.
|
|
|
|
const EXTRACT_NO_COMPACT_MANIFEST: &str = r#"
|
|
[pod]
|
|
name = "test-pod"
|
|
pwd = "./"
|
|
|
|
[model]
|
|
scheme = "anthropic"
|
|
model_id = "test-model"
|
|
|
|
[worker]
|
|
max_tokens = 100
|
|
|
|
[memory]
|
|
extract_threshold = 1
|
|
|
|
[[scope.allow]]
|
|
target = "./"
|
|
permission = "write"
|
|
"#;
|
|
|
|
#[tokio::test]
|
|
async fn extract_large_unprocessed_range_does_not_abort_on_input_occupancy() {
|
|
let client = MockClient::new(vec![
|
|
text_events_with_usage("recorded", 1000),
|
|
write_extracted_tool_use_events("ec-large"),
|
|
single_text_events("done"),
|
|
]);
|
|
let mut pod = make_pod_with_manifest(EXTRACT_NO_COMPACT_MANIFEST, client).await;
|
|
|
|
let large_request = format!("remember this large slice: {}", "x ".repeat(200_000));
|
|
pod.run_text(&large_request).await.unwrap();
|
|
|
|
pod.try_post_run_extract().await.expect(
|
|
"large unprocessed extract ranges must reach the extract worker, not abort locally",
|
|
);
|
|
assert!(
|
|
pod.extract_pointer().is_some(),
|
|
"successful extract should advance the pointer even when the input range is large"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn spawn_and_wait_drives_extract_to_completion() {
|
|
let client = MockClient::new(vec![
|
|
text_events_with_usage("hi", 1000),
|
|
write_extracted_tool_use_events("ec1"),
|
|
single_text_events("done"),
|
|
]);
|
|
let mut pod = make_pod_with_manifest(EXTRACT_NO_COMPACT_MANIFEST, client).await;
|
|
|
|
pod.run_text("first").await.unwrap();
|
|
assert!(
|
|
pod.extract_pointer().is_none(),
|
|
"extract has not run yet — pointer must be None"
|
|
);
|
|
|
|
pod.spawn_post_run_memory_jobs();
|
|
pod.wait_for_memory_jobs().await;
|
|
|
|
assert!(
|
|
pod.extract_pointer().is_some(),
|
|
"spawn + wait must complete extract; pointer should be set"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn detached_extract_does_not_fork_session_log() {
|
|
// Source pod and the cloned memory-task pod share `SegmentState` via
|
|
// `Arc<_>`. The detached extract advances the entry tally through
|
|
// `save_extension`; the next `run` must see that same tally so
|
|
// `ensure_head_or_fork` does not spawn a new session.
|
|
let client = MockClient::new(vec![
|
|
text_events_with_usage("hi", 1000),
|
|
write_extracted_tool_use_events("ec1"),
|
|
single_text_events("done"),
|
|
text_events_with_usage("ok", 1000),
|
|
]);
|
|
let mut pod = make_pod_with_manifest(EXTRACT_NO_COMPACT_MANIFEST, client).await;
|
|
|
|
pod.run_text("first").await.unwrap();
|
|
let session_before = pod.segment_id();
|
|
|
|
pod.spawn_post_run_memory_jobs();
|
|
pod.wait_for_memory_jobs().await;
|
|
|
|
pod.run_text("second").await.unwrap();
|
|
let session_after = pod.segment_id();
|
|
|
|
assert_eq!(
|
|
session_before, session_after,
|
|
"detached extract's save_extension and the next turn's save_user_input \
|
|
must share the entry tally through SegmentState — a fork here means the \
|
|
clone carried its own counter"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn controller_compact_method_emits_start_and_done() {
|
|
let client = MockClient::new(vec![
|
|
text_events_with_usage("hi", 1000),
|
|
write_summary_tool_use_events("manual-summary", "manual compact summary"),
|
|
single_text_events("done"),
|
|
]);
|
|
let pod = make_pod_with_manifest(POST_RUN_MANIFEST_TOML, client).await;
|
|
let runtime_tmp = tempfile::tempdir().unwrap();
|
|
let (handle, _shutdown) = PodController::spawn(pod, runtime_tmp.path()).await.unwrap();
|
|
let mut rx = handle.subscribe();
|
|
|
|
handle
|
|
.send(Method::run_text("seed history"))
|
|
.await
|
|
.expect("send run");
|
|
loop {
|
|
match tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
|
|
.await
|
|
.expect("timeout waiting for run end")
|
|
.expect("event")
|
|
{
|
|
Event::RunEnd {
|
|
result: RunResult::Finished,
|
|
} => break,
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
handle.send(Method::Compact).await.expect("send compact");
|
|
let mut saw_start = false;
|
|
loop {
|
|
match tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
|
|
.await
|
|
.expect("timeout waiting for compact events")
|
|
.expect("event")
|
|
{
|
|
Event::CompactStart => saw_start = true,
|
|
Event::CompactDone { .. } => {
|
|
break;
|
|
}
|
|
Event::CompactFailed { error } => panic!("manual compact failed: {error}"),
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
assert!(saw_start, "manual compact should emit CompactStart");
|
|
let _ = handle.send(Method::Shutdown).await;
|
|
}
|