feat: wire pod metadata lifecycle writes
This commit is contained in:
parent
78209d5126
commit
58608c4f57
|
|
@ -9,8 +9,8 @@ use llm_worker::llm_client::client::LlmClient;
|
|||
use llm_worker::state::Mutable;
|
||||
use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult};
|
||||
use session_store::{
|
||||
LogEntry, PodScopeSnapshot, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log,
|
||||
to_logged,
|
||||
LogEntry, PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodScopeSnapshot, SegmentId,
|
||||
SessionId, Store, StoreError, SystemItem, segment_log, to_logged,
|
||||
};
|
||||
use tracing::{info, warn};
|
||||
|
||||
|
|
@ -50,6 +50,16 @@ pub struct SegmentLocation {
|
|||
pub segment_id: SegmentId,
|
||||
}
|
||||
|
||||
type PodMetadataWriter = Arc<dyn Fn(PodMetadata) -> Result<(), StoreError> + Send + Sync>;
|
||||
|
||||
fn pod_metadata_writer_for_store<St>(store: &St) -> PodMetadataWriter
|
||||
where
|
||||
St: PodMetadataStore + Clone + Send + Sync + 'static,
|
||||
{
|
||||
let store = store.clone();
|
||||
Arc::new(move |metadata| store.write(&metadata))
|
||||
}
|
||||
|
||||
/// Lock-free shared session/segment pointer.
|
||||
///
|
||||
/// Holds the current `(SessionId, SegmentId)` pair and the append tally
|
||||
|
|
@ -181,6 +191,10 @@ pub struct Pod<C: LlmClient, St: Store> {
|
|||
/// Always `Some` outside of `run()`/`resume()`.
|
||||
worker: Option<Worker<C, Mutable>>,
|
||||
store: St,
|
||||
/// Optional write-through hook for name-keyed Pod metadata. Production
|
||||
/// constructors install this from the same FsStore that owns the session
|
||||
/// logs; low-level `Pod::new` tests leave it absent.
|
||||
pod_metadata_writer: Option<PodMetadataWriter>,
|
||||
/// Shared session pointer. Source of truth for the Pod's current
|
||||
/// `segment_id` and append tally. `self.segment_id()` is a thin
|
||||
/// wrapper over `segment_state.segment_id()`.
|
||||
|
|
@ -358,6 +372,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
|
|||
manifest: self.manifest.clone(),
|
||||
worker: Some(worker),
|
||||
store: self.store.clone(),
|
||||
pod_metadata_writer: None,
|
||||
segment_state: self.segment_state.clone(),
|
||||
pwd: self.pwd.clone(),
|
||||
scope: self.scope.clone(),
|
||||
|
|
@ -499,6 +514,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
manifest,
|
||||
worker: Some(worker),
|
||||
store,
|
||||
pod_metadata_writer: None,
|
||||
segment_state: SegmentState::new(session_id, segment_id, 0),
|
||||
pwd,
|
||||
scope: SharedScope::new(scope),
|
||||
|
|
@ -716,6 +732,41 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
&self.store
|
||||
}
|
||||
|
||||
fn write_pod_metadata_pending(&self) -> Result<(), StoreError> {
|
||||
let Some(writer) = &self.pod_metadata_writer else {
|
||||
return Ok(());
|
||||
};
|
||||
writer(PodMetadata::new(
|
||||
self.manifest.pod.name.clone(),
|
||||
Some(PodActiveSegmentRef::pending_segment(self.session_id())),
|
||||
))
|
||||
}
|
||||
|
||||
fn write_pod_metadata_active(&self, loc: SegmentLocation) -> Result<(), StoreError> {
|
||||
let Some(writer) = &self.pod_metadata_writer else {
|
||||
return Ok(());
|
||||
};
|
||||
writer(PodMetadata::new(
|
||||
self.manifest.pod.name.clone(),
|
||||
Some(PodActiveSegmentRef::active_segment(
|
||||
loc.session_id,
|
||||
loc.segment_id,
|
||||
)),
|
||||
))
|
||||
}
|
||||
|
||||
/// Enable name-keyed Pod metadata write-through for Pods built through
|
||||
/// the low-level constructor. High-level manifest constructors enable it
|
||||
/// automatically; this hook lets tests and custom embedders opt into the
|
||||
/// same persistence behavior without changing `Pod::new`'s minimal bounds.
|
||||
pub fn enable_pod_metadata_write_through(&mut self) -> Result<(), StoreError>
|
||||
where
|
||||
St: PodMetadataStore + Clone + Send + Sync + 'static,
|
||||
{
|
||||
self.pod_metadata_writer = Some(pod_metadata_writer_for_store(&self.store));
|
||||
self.write_pod_metadata_pending()
|
||||
}
|
||||
|
||||
/// Current history items held by the underlying Worker.
|
||||
pub fn history(&self) -> &[Item] {
|
||||
self.worker().history()
|
||||
|
|
@ -1660,6 +1711,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
};
|
||||
self.commit_entry(initial)?;
|
||||
self.persist_scope_snapshot()?;
|
||||
self.write_pod_metadata_active(loc)?;
|
||||
return Ok(());
|
||||
}
|
||||
// Check store count + auto-fork if it drifted.
|
||||
|
|
@ -1703,6 +1755,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
if self.scope_allocation.is_some() {
|
||||
pod_registry::update_segment(&self.manifest.pod.name, fork_segment_id)?;
|
||||
}
|
||||
self.write_pod_metadata_active(SegmentLocation {
|
||||
session_id: loc.session_id,
|
||||
segment_id: fork_segment_id,
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -2220,6 +2276,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
if self.scope_allocation.is_some() {
|
||||
pod_registry::update_segment(&self.manifest.pod.name, new_segment_id)?;
|
||||
}
|
||||
self.write_pod_metadata_active(SegmentLocation {
|
||||
session_id: old_loc.session_id,
|
||||
segment_id: new_segment_id,
|
||||
})?;
|
||||
// Align user_segments with the post-compaction history. Items
|
||||
// before `retain_from` (now folded into the summary) lose their
|
||||
// segments; only the user_messages surviving in retained_items
|
||||
|
|
@ -2760,7 +2820,10 @@ enum ConsolidateDecision {
|
|||
Completed,
|
||||
}
|
||||
|
||||
impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
||||
impl<St> Pod<Box<dyn LlmClient>, St>
|
||||
where
|
||||
St: Store + PodMetadataStore + Clone + Send + Sync + 'static,
|
||||
{
|
||||
/// Create a Pod entirely from a validated manifest.
|
||||
///
|
||||
/// The Pod's working directory is captured once here from the
|
||||
|
|
@ -2808,11 +2871,13 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
let mut worker = Worker::new(common.client);
|
||||
apply_worker_manifest(&mut worker, &manifest.worker);
|
||||
worker.set_cache_key(Some(segment_id.to_string()));
|
||||
let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store));
|
||||
|
||||
let mut pod = Self {
|
||||
manifest,
|
||||
worker: Some(worker),
|
||||
store,
|
||||
pod_metadata_writer,
|
||||
segment_state: SegmentState::new(session_id, segment_id, 0),
|
||||
pwd: common.pwd,
|
||||
scope: SharedScope::new(common.scope),
|
||||
|
|
@ -2847,6 +2912,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
};
|
||||
pod.apply_permissions_from_manifest();
|
||||
pod.apply_prune_from_manifest();
|
||||
pod.write_pod_metadata_pending()?;
|
||||
drain_skill_shadows(&pod, skill_shadows);
|
||||
Ok(pod)
|
||||
}
|
||||
|
|
@ -2881,11 +2947,13 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
let mut worker = Worker::new(common.client);
|
||||
apply_worker_manifest(&mut worker, &manifest.worker);
|
||||
worker.set_cache_key(Some(segment_id.to_string()));
|
||||
let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store));
|
||||
|
||||
let mut pod = Self {
|
||||
manifest,
|
||||
worker: Some(worker),
|
||||
store,
|
||||
pod_metadata_writer,
|
||||
segment_state: SegmentState::new(session_id, segment_id, 0),
|
||||
pwd: common.pwd,
|
||||
scope: SharedScope::new(common.scope),
|
||||
|
|
@ -2920,10 +2988,41 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
};
|
||||
pod.apply_permissions_from_manifest();
|
||||
pod.apply_prune_from_manifest();
|
||||
pod.write_pod_metadata_pending()?;
|
||||
drain_skill_shadows(&pod, skill_shadows);
|
||||
Ok(pod)
|
||||
}
|
||||
|
||||
/// Restore a Pod by resolving its name-keyed metadata to an active
|
||||
/// `(SessionId, SegmentId)` and then using the normal session-log restore
|
||||
/// path. The metadata stores only the active pointer; lineage and origin
|
||||
/// remain authoritative in the session log.
|
||||
pub async fn restore_from_pod_metadata(
|
||||
pod_name: &str,
|
||||
manifest: PodManifest,
|
||||
store: St,
|
||||
loader: PromptLoader,
|
||||
) -> Result<Self, PodError> {
|
||||
let metadata =
|
||||
store
|
||||
.read_by_name(pod_name)?
|
||||
.ok_or_else(|| PodError::PodMetadataMissing {
|
||||
pod_name: pod_name.to_string(),
|
||||
})?;
|
||||
let active = metadata
|
||||
.active
|
||||
.ok_or_else(|| PodError::PodMetadataInactive {
|
||||
pod_name: pod_name.to_string(),
|
||||
})?;
|
||||
let segment_id = active
|
||||
.segment_id
|
||||
.ok_or_else(|| PodError::PodMetadataPending {
|
||||
pod_name: pod_name.to_string(),
|
||||
session_id: active.session_id,
|
||||
})?;
|
||||
Self::restore_from_manifest(active.session_id, segment_id, manifest, store, loader).await
|
||||
}
|
||||
|
||||
/// Restore a Pod from an existing session log.
|
||||
///
|
||||
/// Resolves the manifest cascade exactly like [`Self::from_manifest`]
|
||||
|
|
@ -3021,11 +3120,13 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
|
||||
let extract_pointer = memory::extract::fold_pointer(&state.extensions);
|
||||
let task_store = tools::TaskStore::from_history(&state.history);
|
||||
let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store));
|
||||
|
||||
let mut pod = Self {
|
||||
manifest,
|
||||
worker: Some(worker),
|
||||
store,
|
||||
pod_metadata_writer,
|
||||
segment_state: SegmentState::new(session_id, segment_id, state.entries_count),
|
||||
pwd: common.pwd,
|
||||
scope: SharedScope::new(common.scope),
|
||||
|
|
@ -3065,6 +3166,10 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
};
|
||||
pod.apply_permissions_from_manifest();
|
||||
pod.apply_prune_from_manifest();
|
||||
pod.write_pod_metadata_active(SegmentLocation {
|
||||
session_id,
|
||||
segment_id,
|
||||
})?;
|
||||
drain_skill_shadows(&pod, skill_shadows);
|
||||
Ok(pod)
|
||||
}
|
||||
|
|
@ -3293,6 +3398,20 @@ pub enum PodError {
|
|||
"session {segment_id} has no persisted scope snapshot; refusing resume without explicit scope"
|
||||
)]
|
||||
SegmentScopeMissing { segment_id: SegmentId },
|
||||
|
||||
#[error("pod metadata for {pod_name} was not found")]
|
||||
PodMetadataMissing { pod_name: String },
|
||||
|
||||
#[error("pod metadata for {pod_name} has no active session")]
|
||||
PodMetadataInactive { pod_name: String },
|
||||
|
||||
#[error(
|
||||
"pod metadata for {pod_name} points to session {session_id} but no segment is materialized yet"
|
||||
)]
|
||||
PodMetadataPending {
|
||||
pod_name: String,
|
||||
session_id: SessionId,
|
||||
},
|
||||
}
|
||||
|
||||
/// Bundle of resources that every high-level Pod constructor needs:
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve
|
|||
use llm_worker::llm_client::types::Item;
|
||||
use llm_worker::llm_client::{ClientError, LlmClient, Request};
|
||||
use protocol::Event;
|
||||
use session_store::{FsStore, LogEntry, Store};
|
||||
use session_store::{FsStore, LogEntry, PodMetadataStore, Store};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use pod::Pod;
|
||||
|
|
@ -158,7 +158,9 @@ async fn make_pod_with_manifest(
|
|||
std::mem::forget(pwd_tmp);
|
||||
|
||||
let worker = Worker::new(client);
|
||||
Pod::new(manifest, worker, store, pwd, scope).await.unwrap()
|
||||
let mut pod = Pod::new(manifest, worker, store, pwd, scope).await.unwrap();
|
||||
pod.enable_pod_metadata_write_through().unwrap();
|
||||
pod
|
||||
}
|
||||
|
||||
async fn make_pod(client: MockClient) -> Pod<MockClient, FsStore> {
|
||||
|
|
@ -213,6 +215,36 @@ fn system_texts_in_sink_session_start(
|
|||
Vec::new()
|
||||
}
|
||||
|
||||
/// Pod metadata starts with a reserved Session and no Segment, then becomes
|
||||
/// active once the first SegmentStart is materialized by `run`.
|
||||
#[tokio::test]
|
||||
async fn pod_metadata_moves_from_pending_to_active_on_first_run() {
|
||||
let client = MockClient::new(vec![single_text_events("hi")]);
|
||||
let mut pod = make_pod(client).await;
|
||||
let store = pod.store().clone();
|
||||
let session_id = pod.session_id();
|
||||
let initial_segment_id = pod.segment_id();
|
||||
|
||||
let pending = store
|
||||
.read_by_name("test-pod")
|
||||
.unwrap()
|
||||
.expect("metadata should be initialized at Pod construction");
|
||||
assert_eq!(pending.pod_name, "test-pod");
|
||||
let pending_active = pending.active.expect("active session pointer missing");
|
||||
assert_eq!(pending_active.session_id, session_id);
|
||||
assert_eq!(pending_active.segment_id, None);
|
||||
|
||||
pod.run_text("first").await.unwrap();
|
||||
|
||||
let resolved = store
|
||||
.read_by_name("test-pod")
|
||||
.unwrap()
|
||||
.expect("metadata should still exist after first run");
|
||||
let active = resolved.active.expect("active session pointer missing");
|
||||
assert_eq!(active.session_id, session_id);
|
||||
assert_eq!(active.segment_id, Some(initial_segment_id));
|
||||
}
|
||||
|
||||
/// Live auto-fork: when another writer extends the segment behind the
|
||||
/// Pod's back, the next run's `ensure_segment_head` detects the
|
||||
/// entry-count drift and branches into a fresh segment **within the same
|
||||
|
|
@ -274,6 +306,13 @@ permission = "write"
|
|||
let new_segment_id = pod.segment_id();
|
||||
assert_ne!(new_segment_id, source_segment_id);
|
||||
assert_eq!(pod.session_id(), session_id, "auto-fork stays in-Session");
|
||||
let metadata = store
|
||||
.read_by_name("test-pod")
|
||||
.unwrap()
|
||||
.expect("metadata should exist after auto-fork");
|
||||
let active = metadata.active.expect("active session pointer missing");
|
||||
assert_eq!(active.session_id, session_id);
|
||||
assert_eq!(active.segment_id, Some(new_segment_id));
|
||||
|
||||
// New segment records forked_from pointing at the source.
|
||||
let new_entries = store.read_all(session_id, new_segment_id).unwrap();
|
||||
|
|
@ -312,7 +351,17 @@ async fn compact_emits_session_start_carrying_summary_and_task_snapshot() {
|
|||
pod.attach_event_tx(tx);
|
||||
|
||||
pod.run_text("first").await.unwrap();
|
||||
let session_id = pod.session_id();
|
||||
pod.compact(10_000).await.unwrap();
|
||||
let compacted_segment_id = pod.segment_id();
|
||||
let metadata = pod
|
||||
.store()
|
||||
.read_by_name("test-pod")
|
||||
.unwrap()
|
||||
.expect("metadata should exist after compaction");
|
||||
let active = metadata.active.expect("active session pointer missing");
|
||||
assert_eq!(active.session_id, session_id);
|
||||
assert_eq!(active.segment_id, Some(compacted_segment_id));
|
||||
|
||||
let system_texts = system_texts_in_sink_session_start(&pod);
|
||||
// The post-compaction `SegmentStart.history` carries the new system
|
||||
|
|
|
|||
|
|
@ -417,21 +417,23 @@ async fn events_are_broadcast() {
|
|||
|
||||
#[tokio::test]
|
||||
async fn double_run_returns_error() {
|
||||
// Create a client that streams slowly
|
||||
// Keep the first turn in-flight until the test drops the handle. A
|
||||
// finite stream can finish before the second Method reaches the
|
||||
// controller in the full test suite, making this assertion racy.
|
||||
let events = vec![
|
||||
LlmEvent::text_block_start(0),
|
||||
LlmEvent::text_delta(0, "slow..."),
|
||||
// No stop/completed — the stream will end but without proper completion
|
||||
];
|
||||
let client = MockClient::new(events);
|
||||
let client = MockClient::sequential(vec![MockResponse::Hang(events)]);
|
||||
let pod = make_pod(client).await;
|
||||
let handle = spawn_controller(pod).await;
|
||||
let mut rx = handle.subscribe();
|
||||
|
||||
// Send first run
|
||||
// Send first run and wait until the controller has entered Running.
|
||||
handle.send(Method::run_text("first")).await.unwrap();
|
||||
wait_for_status(&handle, PodStatus::Running).await;
|
||||
|
||||
// Immediately send second run (should get error)
|
||||
// Now the second run must be rejected by drive_turn's live Method arm.
|
||||
handle.send(Method::run_text("second")).await.unwrap();
|
||||
|
||||
// Look for the error event
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@
|
|||
use std::sync::{LazyLock, Mutex};
|
||||
|
||||
use pod::{Pod, PodError};
|
||||
use session_store::{FsStore, StoreError};
|
||||
use session_store::{FsStore, PodActiveSegmentRef, PodMetadata, PodMetadataStore, StoreError};
|
||||
|
||||
const MINIMAL_MANIFEST_TOML: &str = r#"
|
||||
[pod]
|
||||
|
|
@ -31,6 +31,96 @@ permission = "write"
|
|||
/// pattern used by other integration tests in this crate.
|
||||
static ENV_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
|
||||
|
||||
#[tokio::test]
|
||||
async fn restore_from_pod_metadata_rejects_missing_metadata() {
|
||||
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
||||
|
||||
let store_tmp = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(store_tmp.path()).unwrap();
|
||||
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
|
||||
|
||||
let result = Pod::restore_from_pod_metadata(
|
||||
"restore-test",
|
||||
manifest,
|
||||
store,
|
||||
pod::PromptLoader::builtins_only(),
|
||||
)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Err(PodError::PodMetadataMissing { pod_name }) => assert_eq!(pod_name, "restore-test"),
|
||||
Err(other) => panic!("expected PodMetadataMissing, got {other:?}"),
|
||||
Ok(_) => panic!("expected missing pod metadata to fail"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn restore_from_pod_metadata_rejects_pending_segment() {
|
||||
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
||||
|
||||
let store_tmp = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(store_tmp.path()).unwrap();
|
||||
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
|
||||
let session_id = session_store::new_session_id();
|
||||
store
|
||||
.write(&PodMetadata::new(
|
||||
"restore-test",
|
||||
Some(PodActiveSegmentRef::pending_segment(session_id)),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let result = Pod::restore_from_pod_metadata(
|
||||
"restore-test",
|
||||
manifest,
|
||||
store,
|
||||
pod::PromptLoader::builtins_only(),
|
||||
)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Err(PodError::PodMetadataPending {
|
||||
pod_name,
|
||||
session_id: actual,
|
||||
}) => {
|
||||
assert_eq!(pod_name, "restore-test");
|
||||
assert_eq!(actual, session_id);
|
||||
}
|
||||
Err(other) => panic!("expected PodMetadataPending, got {other:?}"),
|
||||
Ok(_) => panic!("expected pending pod metadata to fail"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn restore_from_pod_metadata_resolves_active_pointer_through_session_log() {
|
||||
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
||||
|
||||
let store_tmp = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(store_tmp.path()).unwrap();
|
||||
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
|
||||
let session_id = session_store::new_session_id();
|
||||
let segment_id = session_store::new_segment_id();
|
||||
store
|
||||
.write(&PodMetadata::new(
|
||||
"restore-test",
|
||||
Some(PodActiveSegmentRef::active_segment(session_id, segment_id)),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let result = Pod::restore_from_pod_metadata(
|
||||
"restore-test",
|
||||
manifest,
|
||||
store,
|
||||
pod::PromptLoader::builtins_only(),
|
||||
)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Err(PodError::Store(StoreError::NotFound(id))) => assert_eq!(id, segment_id),
|
||||
Err(other) => panic!("expected Store(NotFound) from resolved segment, got {other:?}"),
|
||||
Ok(_) => panic!("expected unknown resolved segment to fail"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn restore_from_manifest_rejects_unknown_segment() {
|
||||
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user