merge: pod-state-write-points
This commit is contained in:
commit
7f6e3b949f
|
|
@ -9,8 +9,8 @@ use llm_worker::llm_client::client::LlmClient;
|
||||||
use llm_worker::state::Mutable;
|
use llm_worker::state::Mutable;
|
||||||
use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult};
|
use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult};
|
||||||
use session_store::{
|
use session_store::{
|
||||||
LogEntry, PodScopeSnapshot, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log,
|
LogEntry, PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodScopeSnapshot, SegmentId,
|
||||||
to_logged,
|
SessionId, Store, StoreError, SystemItem, segment_log, to_logged,
|
||||||
};
|
};
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
|
|
||||||
|
|
@ -50,6 +50,16 @@ pub struct SegmentLocation {
|
||||||
pub segment_id: SegmentId,
|
pub segment_id: SegmentId,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PodMetadataWriter = Arc<dyn Fn(PodMetadata) -> Result<(), StoreError> + Send + Sync>;
|
||||||
|
|
||||||
|
fn pod_metadata_writer_for_store<St>(store: &St) -> PodMetadataWriter
|
||||||
|
where
|
||||||
|
St: PodMetadataStore + Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
let store = store.clone();
|
||||||
|
Arc::new(move |metadata| store.write(&metadata))
|
||||||
|
}
|
||||||
|
|
||||||
/// Lock-free shared session/segment pointer.
|
/// Lock-free shared session/segment pointer.
|
||||||
///
|
///
|
||||||
/// Holds the current `(SessionId, SegmentId)` pair and the append tally
|
/// Holds the current `(SessionId, SegmentId)` pair and the append tally
|
||||||
|
|
@ -181,6 +191,10 @@ pub struct Pod<C: LlmClient, St: Store> {
|
||||||
/// Always `Some` outside of `run()`/`resume()`.
|
/// Always `Some` outside of `run()`/`resume()`.
|
||||||
worker: Option<Worker<C, Mutable>>,
|
worker: Option<Worker<C, Mutable>>,
|
||||||
store: St,
|
store: St,
|
||||||
|
/// Optional write-through hook for name-keyed Pod metadata. Production
|
||||||
|
/// constructors install this from the same FsStore that owns the session
|
||||||
|
/// logs; low-level `Pod::new` tests leave it absent.
|
||||||
|
pod_metadata_writer: Option<PodMetadataWriter>,
|
||||||
/// Shared session pointer. Source of truth for the Pod's current
|
/// Shared session pointer. Source of truth for the Pod's current
|
||||||
/// `segment_id` and append tally. `self.segment_id()` is a thin
|
/// `segment_id` and append tally. `self.segment_id()` is a thin
|
||||||
/// wrapper over `segment_state.segment_id()`.
|
/// wrapper over `segment_state.segment_id()`.
|
||||||
|
|
@ -358,6 +372,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
|
||||||
manifest: self.manifest.clone(),
|
manifest: self.manifest.clone(),
|
||||||
worker: Some(worker),
|
worker: Some(worker),
|
||||||
store: self.store.clone(),
|
store: self.store.clone(),
|
||||||
|
pod_metadata_writer: None,
|
||||||
segment_state: self.segment_state.clone(),
|
segment_state: self.segment_state.clone(),
|
||||||
pwd: self.pwd.clone(),
|
pwd: self.pwd.clone(),
|
||||||
scope: self.scope.clone(),
|
scope: self.scope.clone(),
|
||||||
|
|
@ -499,6 +514,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
manifest,
|
manifest,
|
||||||
worker: Some(worker),
|
worker: Some(worker),
|
||||||
store,
|
store,
|
||||||
|
pod_metadata_writer: None,
|
||||||
segment_state: SegmentState::new(session_id, segment_id, 0),
|
segment_state: SegmentState::new(session_id, segment_id, 0),
|
||||||
pwd,
|
pwd,
|
||||||
scope: SharedScope::new(scope),
|
scope: SharedScope::new(scope),
|
||||||
|
|
@ -716,6 +732,41 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
&self.store
|
&self.store
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn write_pod_metadata_pending(&self) -> Result<(), StoreError> {
|
||||||
|
let Some(writer) = &self.pod_metadata_writer else {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
writer(PodMetadata::new(
|
||||||
|
self.manifest.pod.name.clone(),
|
||||||
|
Some(PodActiveSegmentRef::pending_segment(self.session_id())),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_pod_metadata_active(&self, loc: SegmentLocation) -> Result<(), StoreError> {
|
||||||
|
let Some(writer) = &self.pod_metadata_writer else {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
writer(PodMetadata::new(
|
||||||
|
self.manifest.pod.name.clone(),
|
||||||
|
Some(PodActiveSegmentRef::active_segment(
|
||||||
|
loc.session_id,
|
||||||
|
loc.segment_id,
|
||||||
|
)),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Enable name-keyed Pod metadata write-through for Pods built through
|
||||||
|
/// the low-level constructor. High-level manifest constructors enable it
|
||||||
|
/// automatically; this hook lets tests and custom embedders opt into the
|
||||||
|
/// same persistence behavior without changing `Pod::new`'s minimal bounds.
|
||||||
|
pub fn enable_pod_metadata_write_through(&mut self) -> Result<(), StoreError>
|
||||||
|
where
|
||||||
|
St: PodMetadataStore + Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
self.pod_metadata_writer = Some(pod_metadata_writer_for_store(&self.store));
|
||||||
|
self.write_pod_metadata_pending()
|
||||||
|
}
|
||||||
|
|
||||||
/// Current history items held by the underlying Worker.
|
/// Current history items held by the underlying Worker.
|
||||||
pub fn history(&self) -> &[Item] {
|
pub fn history(&self) -> &[Item] {
|
||||||
self.worker().history()
|
self.worker().history()
|
||||||
|
|
@ -1660,6 +1711,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
};
|
};
|
||||||
self.commit_entry(initial)?;
|
self.commit_entry(initial)?;
|
||||||
self.persist_scope_snapshot()?;
|
self.persist_scope_snapshot()?;
|
||||||
|
self.write_pod_metadata_active(loc)?;
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
// Check store count + auto-fork if it drifted.
|
// Check store count + auto-fork if it drifted.
|
||||||
|
|
@ -1703,6 +1755,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
if self.scope_allocation.is_some() {
|
if self.scope_allocation.is_some() {
|
||||||
pod_registry::update_segment(&self.manifest.pod.name, fork_segment_id)?;
|
pod_registry::update_segment(&self.manifest.pod.name, fork_segment_id)?;
|
||||||
}
|
}
|
||||||
|
self.write_pod_metadata_active(SegmentLocation {
|
||||||
|
session_id: loc.session_id,
|
||||||
|
segment_id: fork_segment_id,
|
||||||
|
})?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -2220,6 +2276,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
if self.scope_allocation.is_some() {
|
if self.scope_allocation.is_some() {
|
||||||
pod_registry::update_segment(&self.manifest.pod.name, new_segment_id)?;
|
pod_registry::update_segment(&self.manifest.pod.name, new_segment_id)?;
|
||||||
}
|
}
|
||||||
|
self.write_pod_metadata_active(SegmentLocation {
|
||||||
|
session_id: old_loc.session_id,
|
||||||
|
segment_id: new_segment_id,
|
||||||
|
})?;
|
||||||
// Align user_segments with the post-compaction history. Items
|
// Align user_segments with the post-compaction history. Items
|
||||||
// before `retain_from` (now folded into the summary) lose their
|
// before `retain_from` (now folded into the summary) lose their
|
||||||
// segments; only the user_messages surviving in retained_items
|
// segments; only the user_messages surviving in retained_items
|
||||||
|
|
@ -2760,7 +2820,10 @@ enum ConsolidateDecision {
|
||||||
Completed,
|
Completed,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
impl<St> Pod<Box<dyn LlmClient>, St>
|
||||||
|
where
|
||||||
|
St: Store + PodMetadataStore + Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
/// Create a Pod entirely from a validated manifest.
|
/// Create a Pod entirely from a validated manifest.
|
||||||
///
|
///
|
||||||
/// The Pod's working directory is captured once here from the
|
/// The Pod's working directory is captured once here from the
|
||||||
|
|
@ -2808,11 +2871,13 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
||||||
let mut worker = Worker::new(common.client);
|
let mut worker = Worker::new(common.client);
|
||||||
apply_worker_manifest(&mut worker, &manifest.worker);
|
apply_worker_manifest(&mut worker, &manifest.worker);
|
||||||
worker.set_cache_key(Some(segment_id.to_string()));
|
worker.set_cache_key(Some(segment_id.to_string()));
|
||||||
|
let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store));
|
||||||
|
|
||||||
let mut pod = Self {
|
let mut pod = Self {
|
||||||
manifest,
|
manifest,
|
||||||
worker: Some(worker),
|
worker: Some(worker),
|
||||||
store,
|
store,
|
||||||
|
pod_metadata_writer,
|
||||||
segment_state: SegmentState::new(session_id, segment_id, 0),
|
segment_state: SegmentState::new(session_id, segment_id, 0),
|
||||||
pwd: common.pwd,
|
pwd: common.pwd,
|
||||||
scope: SharedScope::new(common.scope),
|
scope: SharedScope::new(common.scope),
|
||||||
|
|
@ -2847,6 +2912,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
||||||
};
|
};
|
||||||
pod.apply_permissions_from_manifest();
|
pod.apply_permissions_from_manifest();
|
||||||
pod.apply_prune_from_manifest();
|
pod.apply_prune_from_manifest();
|
||||||
|
pod.write_pod_metadata_pending()?;
|
||||||
drain_skill_shadows(&pod, skill_shadows);
|
drain_skill_shadows(&pod, skill_shadows);
|
||||||
Ok(pod)
|
Ok(pod)
|
||||||
}
|
}
|
||||||
|
|
@ -2881,11 +2947,13 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
||||||
let mut worker = Worker::new(common.client);
|
let mut worker = Worker::new(common.client);
|
||||||
apply_worker_manifest(&mut worker, &manifest.worker);
|
apply_worker_manifest(&mut worker, &manifest.worker);
|
||||||
worker.set_cache_key(Some(segment_id.to_string()));
|
worker.set_cache_key(Some(segment_id.to_string()));
|
||||||
|
let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store));
|
||||||
|
|
||||||
let mut pod = Self {
|
let mut pod = Self {
|
||||||
manifest,
|
manifest,
|
||||||
worker: Some(worker),
|
worker: Some(worker),
|
||||||
store,
|
store,
|
||||||
|
pod_metadata_writer,
|
||||||
segment_state: SegmentState::new(session_id, segment_id, 0),
|
segment_state: SegmentState::new(session_id, segment_id, 0),
|
||||||
pwd: common.pwd,
|
pwd: common.pwd,
|
||||||
scope: SharedScope::new(common.scope),
|
scope: SharedScope::new(common.scope),
|
||||||
|
|
@ -2920,10 +2988,41 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
||||||
};
|
};
|
||||||
pod.apply_permissions_from_manifest();
|
pod.apply_permissions_from_manifest();
|
||||||
pod.apply_prune_from_manifest();
|
pod.apply_prune_from_manifest();
|
||||||
|
pod.write_pod_metadata_pending()?;
|
||||||
drain_skill_shadows(&pod, skill_shadows);
|
drain_skill_shadows(&pod, skill_shadows);
|
||||||
Ok(pod)
|
Ok(pod)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Restore a Pod by resolving its name-keyed metadata to an active
|
||||||
|
/// `(SessionId, SegmentId)` and then using the normal session-log restore
|
||||||
|
/// path. The metadata stores only the active pointer; lineage and origin
|
||||||
|
/// remain authoritative in the session log.
|
||||||
|
pub async fn restore_from_pod_metadata(
|
||||||
|
pod_name: &str,
|
||||||
|
manifest: PodManifest,
|
||||||
|
store: St,
|
||||||
|
loader: PromptLoader,
|
||||||
|
) -> Result<Self, PodError> {
|
||||||
|
let metadata =
|
||||||
|
store
|
||||||
|
.read_by_name(pod_name)?
|
||||||
|
.ok_or_else(|| PodError::PodMetadataMissing {
|
||||||
|
pod_name: pod_name.to_string(),
|
||||||
|
})?;
|
||||||
|
let active = metadata
|
||||||
|
.active
|
||||||
|
.ok_or_else(|| PodError::PodMetadataInactive {
|
||||||
|
pod_name: pod_name.to_string(),
|
||||||
|
})?;
|
||||||
|
let segment_id = active
|
||||||
|
.segment_id
|
||||||
|
.ok_or_else(|| PodError::PodMetadataPending {
|
||||||
|
pod_name: pod_name.to_string(),
|
||||||
|
session_id: active.session_id,
|
||||||
|
})?;
|
||||||
|
Self::restore_from_manifest(active.session_id, segment_id, manifest, store, loader).await
|
||||||
|
}
|
||||||
|
|
||||||
/// Restore a Pod from an existing session log.
|
/// Restore a Pod from an existing session log.
|
||||||
///
|
///
|
||||||
/// Resolves the manifest cascade exactly like [`Self::from_manifest`]
|
/// Resolves the manifest cascade exactly like [`Self::from_manifest`]
|
||||||
|
|
@ -3021,11 +3120,13 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
||||||
|
|
||||||
let extract_pointer = memory::extract::fold_pointer(&state.extensions);
|
let extract_pointer = memory::extract::fold_pointer(&state.extensions);
|
||||||
let task_store = tools::TaskStore::from_history(&state.history);
|
let task_store = tools::TaskStore::from_history(&state.history);
|
||||||
|
let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store));
|
||||||
|
|
||||||
let mut pod = Self {
|
let mut pod = Self {
|
||||||
manifest,
|
manifest,
|
||||||
worker: Some(worker),
|
worker: Some(worker),
|
||||||
store,
|
store,
|
||||||
|
pod_metadata_writer,
|
||||||
segment_state: SegmentState::new(session_id, segment_id, state.entries_count),
|
segment_state: SegmentState::new(session_id, segment_id, state.entries_count),
|
||||||
pwd: common.pwd,
|
pwd: common.pwd,
|
||||||
scope: SharedScope::new(common.scope),
|
scope: SharedScope::new(common.scope),
|
||||||
|
|
@ -3065,6 +3166,10 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
||||||
};
|
};
|
||||||
pod.apply_permissions_from_manifest();
|
pod.apply_permissions_from_manifest();
|
||||||
pod.apply_prune_from_manifest();
|
pod.apply_prune_from_manifest();
|
||||||
|
pod.write_pod_metadata_active(SegmentLocation {
|
||||||
|
session_id,
|
||||||
|
segment_id,
|
||||||
|
})?;
|
||||||
drain_skill_shadows(&pod, skill_shadows);
|
drain_skill_shadows(&pod, skill_shadows);
|
||||||
Ok(pod)
|
Ok(pod)
|
||||||
}
|
}
|
||||||
|
|
@ -3293,6 +3398,20 @@ pub enum PodError {
|
||||||
"session {segment_id} has no persisted scope snapshot; refusing resume without explicit scope"
|
"session {segment_id} has no persisted scope snapshot; refusing resume without explicit scope"
|
||||||
)]
|
)]
|
||||||
SegmentScopeMissing { segment_id: SegmentId },
|
SegmentScopeMissing { segment_id: SegmentId },
|
||||||
|
|
||||||
|
#[error("pod metadata for {pod_name} was not found")]
|
||||||
|
PodMetadataMissing { pod_name: String },
|
||||||
|
|
||||||
|
#[error("pod metadata for {pod_name} has no active session")]
|
||||||
|
PodMetadataInactive { pod_name: String },
|
||||||
|
|
||||||
|
#[error(
|
||||||
|
"pod metadata for {pod_name} points to session {session_id} but no segment is materialized yet"
|
||||||
|
)]
|
||||||
|
PodMetadataPending {
|
||||||
|
pod_name: String,
|
||||||
|
session_id: SessionId,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Bundle of resources that every high-level Pod constructor needs:
|
/// Bundle of resources that every high-level Pod constructor needs:
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve
|
||||||
use llm_worker::llm_client::types::Item;
|
use llm_worker::llm_client::types::Item;
|
||||||
use llm_worker::llm_client::{ClientError, LlmClient, Request};
|
use llm_worker::llm_client::{ClientError, LlmClient, Request};
|
||||||
use protocol::Event;
|
use protocol::Event;
|
||||||
use session_store::{FsStore, LogEntry, Store};
|
use session_store::{FsStore, LogEntry, PodMetadataStore, Store};
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
use pod::Pod;
|
use pod::Pod;
|
||||||
|
|
@ -158,7 +158,9 @@ async fn make_pod_with_manifest(
|
||||||
std::mem::forget(pwd_tmp);
|
std::mem::forget(pwd_tmp);
|
||||||
|
|
||||||
let worker = Worker::new(client);
|
let worker = Worker::new(client);
|
||||||
Pod::new(manifest, worker, store, pwd, scope).await.unwrap()
|
let mut pod = Pod::new(manifest, worker, store, pwd, scope).await.unwrap();
|
||||||
|
pod.enable_pod_metadata_write_through().unwrap();
|
||||||
|
pod
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn make_pod(client: MockClient) -> Pod<MockClient, FsStore> {
|
async fn make_pod(client: MockClient) -> Pod<MockClient, FsStore> {
|
||||||
|
|
@ -213,6 +215,36 @@ fn system_texts_in_sink_session_start(
|
||||||
Vec::new()
|
Vec::new()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Pod metadata starts with a reserved Session and no Segment, then becomes
|
||||||
|
/// active once the first SegmentStart is materialized by `run`.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pod_metadata_moves_from_pending_to_active_on_first_run() {
|
||||||
|
let client = MockClient::new(vec![single_text_events("hi")]);
|
||||||
|
let mut pod = make_pod(client).await;
|
||||||
|
let store = pod.store().clone();
|
||||||
|
let session_id = pod.session_id();
|
||||||
|
let initial_segment_id = pod.segment_id();
|
||||||
|
|
||||||
|
let pending = store
|
||||||
|
.read_by_name("test-pod")
|
||||||
|
.unwrap()
|
||||||
|
.expect("metadata should be initialized at Pod construction");
|
||||||
|
assert_eq!(pending.pod_name, "test-pod");
|
||||||
|
let pending_active = pending.active.expect("active session pointer missing");
|
||||||
|
assert_eq!(pending_active.session_id, session_id);
|
||||||
|
assert_eq!(pending_active.segment_id, None);
|
||||||
|
|
||||||
|
pod.run_text("first").await.unwrap();
|
||||||
|
|
||||||
|
let resolved = store
|
||||||
|
.read_by_name("test-pod")
|
||||||
|
.unwrap()
|
||||||
|
.expect("metadata should still exist after first run");
|
||||||
|
let active = resolved.active.expect("active session pointer missing");
|
||||||
|
assert_eq!(active.session_id, session_id);
|
||||||
|
assert_eq!(active.segment_id, Some(initial_segment_id));
|
||||||
|
}
|
||||||
|
|
||||||
/// Live auto-fork: when another writer extends the segment behind the
|
/// Live auto-fork: when another writer extends the segment behind the
|
||||||
/// Pod's back, the next run's `ensure_segment_head` detects the
|
/// Pod's back, the next run's `ensure_segment_head` detects the
|
||||||
/// entry-count drift and branches into a fresh segment **within the same
|
/// entry-count drift and branches into a fresh segment **within the same
|
||||||
|
|
@ -274,6 +306,13 @@ permission = "write"
|
||||||
let new_segment_id = pod.segment_id();
|
let new_segment_id = pod.segment_id();
|
||||||
assert_ne!(new_segment_id, source_segment_id);
|
assert_ne!(new_segment_id, source_segment_id);
|
||||||
assert_eq!(pod.session_id(), session_id, "auto-fork stays in-Session");
|
assert_eq!(pod.session_id(), session_id, "auto-fork stays in-Session");
|
||||||
|
let metadata = store
|
||||||
|
.read_by_name("test-pod")
|
||||||
|
.unwrap()
|
||||||
|
.expect("metadata should exist after auto-fork");
|
||||||
|
let active = metadata.active.expect("active session pointer missing");
|
||||||
|
assert_eq!(active.session_id, session_id);
|
||||||
|
assert_eq!(active.segment_id, Some(new_segment_id));
|
||||||
|
|
||||||
// New segment records forked_from pointing at the source.
|
// New segment records forked_from pointing at the source.
|
||||||
let new_entries = store.read_all(session_id, new_segment_id).unwrap();
|
let new_entries = store.read_all(session_id, new_segment_id).unwrap();
|
||||||
|
|
@ -312,7 +351,17 @@ async fn compact_emits_session_start_carrying_summary_and_task_snapshot() {
|
||||||
pod.attach_event_tx(tx);
|
pod.attach_event_tx(tx);
|
||||||
|
|
||||||
pod.run_text("first").await.unwrap();
|
pod.run_text("first").await.unwrap();
|
||||||
|
let session_id = pod.session_id();
|
||||||
pod.compact(10_000).await.unwrap();
|
pod.compact(10_000).await.unwrap();
|
||||||
|
let compacted_segment_id = pod.segment_id();
|
||||||
|
let metadata = pod
|
||||||
|
.store()
|
||||||
|
.read_by_name("test-pod")
|
||||||
|
.unwrap()
|
||||||
|
.expect("metadata should exist after compaction");
|
||||||
|
let active = metadata.active.expect("active session pointer missing");
|
||||||
|
assert_eq!(active.session_id, session_id);
|
||||||
|
assert_eq!(active.segment_id, Some(compacted_segment_id));
|
||||||
|
|
||||||
let system_texts = system_texts_in_sink_session_start(&pod);
|
let system_texts = system_texts_in_sink_session_start(&pod);
|
||||||
// The post-compaction `SegmentStart.history` carries the new system
|
// The post-compaction `SegmentStart.history` carries the new system
|
||||||
|
|
|
||||||
|
|
@ -417,21 +417,23 @@ async fn events_are_broadcast() {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn double_run_returns_error() {
|
async fn double_run_returns_error() {
|
||||||
// Create a client that streams slowly
|
// Keep the first turn in-flight until the test drops the handle. A
|
||||||
|
// finite stream can finish before the second Method reaches the
|
||||||
|
// controller in the full test suite, making this assertion racy.
|
||||||
let events = vec![
|
let events = vec![
|
||||||
LlmEvent::text_block_start(0),
|
LlmEvent::text_block_start(0),
|
||||||
LlmEvent::text_delta(0, "slow..."),
|
LlmEvent::text_delta(0, "slow..."),
|
||||||
// No stop/completed — the stream will end but without proper completion
|
|
||||||
];
|
];
|
||||||
let client = MockClient::new(events);
|
let client = MockClient::sequential(vec![MockResponse::Hang(events)]);
|
||||||
let pod = make_pod(client).await;
|
let pod = make_pod(client).await;
|
||||||
let handle = spawn_controller(pod).await;
|
let handle = spawn_controller(pod).await;
|
||||||
let mut rx = handle.subscribe();
|
let mut rx = handle.subscribe();
|
||||||
|
|
||||||
// Send first run
|
// Send first run and wait until the controller has entered Running.
|
||||||
handle.send(Method::run_text("first")).await.unwrap();
|
handle.send(Method::run_text("first")).await.unwrap();
|
||||||
|
wait_for_status(&handle, PodStatus::Running).await;
|
||||||
|
|
||||||
// Immediately send second run (should get error)
|
// Now the second run must be rejected by drive_turn's live Method arm.
|
||||||
handle.send(Method::run_text("second")).await.unwrap();
|
handle.send(Method::run_text("second")).await.unwrap();
|
||||||
|
|
||||||
// Look for the error event
|
// Look for the error event
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@
|
||||||
use std::sync::{LazyLock, Mutex};
|
use std::sync::{LazyLock, Mutex};
|
||||||
|
|
||||||
use pod::{Pod, PodError};
|
use pod::{Pod, PodError};
|
||||||
use session_store::{FsStore, StoreError};
|
use session_store::{FsStore, PodActiveSegmentRef, PodMetadata, PodMetadataStore, StoreError};
|
||||||
|
|
||||||
const MINIMAL_MANIFEST_TOML: &str = r#"
|
const MINIMAL_MANIFEST_TOML: &str = r#"
|
||||||
[pod]
|
[pod]
|
||||||
|
|
@ -31,6 +31,96 @@ permission = "write"
|
||||||
/// pattern used by other integration tests in this crate.
|
/// pattern used by other integration tests in this crate.
|
||||||
static ENV_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
|
static ENV_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn restore_from_pod_metadata_rejects_missing_metadata() {
|
||||||
|
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
|
|
||||||
|
let store_tmp = tempfile::tempdir().unwrap();
|
||||||
|
let store = FsStore::new(store_tmp.path()).unwrap();
|
||||||
|
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
|
||||||
|
|
||||||
|
let result = Pod::restore_from_pod_metadata(
|
||||||
|
"restore-test",
|
||||||
|
manifest,
|
||||||
|
store,
|
||||||
|
pod::PromptLoader::builtins_only(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Err(PodError::PodMetadataMissing { pod_name }) => assert_eq!(pod_name, "restore-test"),
|
||||||
|
Err(other) => panic!("expected PodMetadataMissing, got {other:?}"),
|
||||||
|
Ok(_) => panic!("expected missing pod metadata to fail"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn restore_from_pod_metadata_rejects_pending_segment() {
|
||||||
|
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
|
|
||||||
|
let store_tmp = tempfile::tempdir().unwrap();
|
||||||
|
let store = FsStore::new(store_tmp.path()).unwrap();
|
||||||
|
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
|
||||||
|
let session_id = session_store::new_session_id();
|
||||||
|
store
|
||||||
|
.write(&PodMetadata::new(
|
||||||
|
"restore-test",
|
||||||
|
Some(PodActiveSegmentRef::pending_segment(session_id)),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let result = Pod::restore_from_pod_metadata(
|
||||||
|
"restore-test",
|
||||||
|
manifest,
|
||||||
|
store,
|
||||||
|
pod::PromptLoader::builtins_only(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Err(PodError::PodMetadataPending {
|
||||||
|
pod_name,
|
||||||
|
session_id: actual,
|
||||||
|
}) => {
|
||||||
|
assert_eq!(pod_name, "restore-test");
|
||||||
|
assert_eq!(actual, session_id);
|
||||||
|
}
|
||||||
|
Err(other) => panic!("expected PodMetadataPending, got {other:?}"),
|
||||||
|
Ok(_) => panic!("expected pending pod metadata to fail"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn restore_from_pod_metadata_resolves_active_pointer_through_session_log() {
|
||||||
|
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
|
|
||||||
|
let store_tmp = tempfile::tempdir().unwrap();
|
||||||
|
let store = FsStore::new(store_tmp.path()).unwrap();
|
||||||
|
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
|
||||||
|
let session_id = session_store::new_session_id();
|
||||||
|
let segment_id = session_store::new_segment_id();
|
||||||
|
store
|
||||||
|
.write(&PodMetadata::new(
|
||||||
|
"restore-test",
|
||||||
|
Some(PodActiveSegmentRef::active_segment(session_id, segment_id)),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let result = Pod::restore_from_pod_metadata(
|
||||||
|
"restore-test",
|
||||||
|
manifest,
|
||||||
|
store,
|
||||||
|
pod::PromptLoader::builtins_only(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Err(PodError::Store(StoreError::NotFound(id))) => assert_eq!(id, segment_id),
|
||||||
|
Err(other) => panic!("expected Store(NotFound) from resolved segment, got {other:?}"),
|
||||||
|
Ok(_) => panic!("expected unknown resolved segment to fail"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn restore_from_manifest_rejects_unknown_segment() {
|
async fn restore_from_manifest_rejects_unknown_segment() {
|
||||||
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user