diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 0f1112de..b2c67097 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -9,8 +9,8 @@ use llm_worker::llm_client::client::LlmClient; use llm_worker::state::Mutable; use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult}; use session_store::{ - LogEntry, PodScopeSnapshot, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log, - to_logged, + LogEntry, PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodScopeSnapshot, SegmentId, + SessionId, Store, StoreError, SystemItem, segment_log, to_logged, }; use tracing::{info, warn}; @@ -50,6 +50,16 @@ pub struct SegmentLocation { pub segment_id: SegmentId, } +type PodMetadataWriter = Arc Result<(), StoreError> + Send + Sync>; + +fn pod_metadata_writer_for_store(store: &St) -> PodMetadataWriter +where + St: PodMetadataStore + Clone + Send + Sync + 'static, +{ + let store = store.clone(); + Arc::new(move |metadata| store.write(&metadata)) +} + /// Lock-free shared session/segment pointer. /// /// Holds the current `(SessionId, SegmentId)` pair and the append tally @@ -181,6 +191,10 @@ pub struct Pod { /// Always `Some` outside of `run()`/`resume()`. worker: Option>, store: St, + /// Optional write-through hook for name-keyed Pod metadata. Production + /// constructors install this from the same FsStore that owns the session + /// logs; low-level `Pod::new` tests leave it absent. + pod_metadata_writer: Option, /// Shared session pointer. Source of truth for the Pod's current /// `segment_id` and append tally. `self.segment_id()` is a thin /// wrapper over `segment_state.segment_id()`. @@ -358,6 +372,7 @@ impl Pod { manifest: self.manifest.clone(), worker: Some(worker), store: self.store.clone(), + pod_metadata_writer: None, segment_state: self.segment_state.clone(), pwd: self.pwd.clone(), scope: self.scope.clone(), @@ -499,6 +514,7 @@ impl Pod { manifest, worker: Some(worker), store, + pod_metadata_writer: None, segment_state: SegmentState::new(session_id, segment_id, 0), pwd, scope: SharedScope::new(scope), @@ -716,6 +732,41 @@ impl Pod { &self.store } + fn write_pod_metadata_pending(&self) -> Result<(), StoreError> { + let Some(writer) = &self.pod_metadata_writer else { + return Ok(()); + }; + writer(PodMetadata::new( + self.manifest.pod.name.clone(), + Some(PodActiveSegmentRef::pending_segment(self.session_id())), + )) + } + + fn write_pod_metadata_active(&self, loc: SegmentLocation) -> Result<(), StoreError> { + let Some(writer) = &self.pod_metadata_writer else { + return Ok(()); + }; + writer(PodMetadata::new( + self.manifest.pod.name.clone(), + Some(PodActiveSegmentRef::active_segment( + loc.session_id, + loc.segment_id, + )), + )) + } + + /// Enable name-keyed Pod metadata write-through for Pods built through + /// the low-level constructor. High-level manifest constructors enable it + /// automatically; this hook lets tests and custom embedders opt into the + /// same persistence behavior without changing `Pod::new`'s minimal bounds. + pub fn enable_pod_metadata_write_through(&mut self) -> Result<(), StoreError> + where + St: PodMetadataStore + Clone + Send + Sync + 'static, + { + self.pod_metadata_writer = Some(pod_metadata_writer_for_store(&self.store)); + self.write_pod_metadata_pending() + } + /// Current history items held by the underlying Worker. pub fn history(&self) -> &[Item] { self.worker().history() @@ -1660,6 +1711,7 @@ impl Pod { }; self.commit_entry(initial)?; self.persist_scope_snapshot()?; + self.write_pod_metadata_active(loc)?; return Ok(()); } // Check store count + auto-fork if it drifted. @@ -1703,6 +1755,10 @@ impl Pod { if self.scope_allocation.is_some() { pod_registry::update_segment(&self.manifest.pod.name, fork_segment_id)?; } + self.write_pod_metadata_active(SegmentLocation { + session_id: loc.session_id, + segment_id: fork_segment_id, + })?; Ok(()) } @@ -2220,6 +2276,10 @@ impl Pod { if self.scope_allocation.is_some() { pod_registry::update_segment(&self.manifest.pod.name, new_segment_id)?; } + self.write_pod_metadata_active(SegmentLocation { + session_id: old_loc.session_id, + segment_id: new_segment_id, + })?; // Align user_segments with the post-compaction history. Items // before `retain_from` (now folded into the summary) lose their // segments; only the user_messages surviving in retained_items @@ -2760,7 +2820,10 @@ enum ConsolidateDecision { Completed, } -impl Pod, St> { +impl Pod, St> +where + St: Store + PodMetadataStore + Clone + Send + Sync + 'static, +{ /// Create a Pod entirely from a validated manifest. /// /// The Pod's working directory is captured once here from the @@ -2808,11 +2871,13 @@ impl Pod, St> { let mut worker = Worker::new(common.client); apply_worker_manifest(&mut worker, &manifest.worker); worker.set_cache_key(Some(segment_id.to_string())); + let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store)); let mut pod = Self { manifest, worker: Some(worker), store, + pod_metadata_writer, segment_state: SegmentState::new(session_id, segment_id, 0), pwd: common.pwd, scope: SharedScope::new(common.scope), @@ -2847,6 +2912,7 @@ impl Pod, St> { }; pod.apply_permissions_from_manifest(); pod.apply_prune_from_manifest(); + pod.write_pod_metadata_pending()?; drain_skill_shadows(&pod, skill_shadows); Ok(pod) } @@ -2881,11 +2947,13 @@ impl Pod, St> { let mut worker = Worker::new(common.client); apply_worker_manifest(&mut worker, &manifest.worker); worker.set_cache_key(Some(segment_id.to_string())); + let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store)); let mut pod = Self { manifest, worker: Some(worker), store, + pod_metadata_writer, segment_state: SegmentState::new(session_id, segment_id, 0), pwd: common.pwd, scope: SharedScope::new(common.scope), @@ -2920,10 +2988,41 @@ impl Pod, St> { }; pod.apply_permissions_from_manifest(); pod.apply_prune_from_manifest(); + pod.write_pod_metadata_pending()?; drain_skill_shadows(&pod, skill_shadows); Ok(pod) } + /// Restore a Pod by resolving its name-keyed metadata to an active + /// `(SessionId, SegmentId)` and then using the normal session-log restore + /// path. The metadata stores only the active pointer; lineage and origin + /// remain authoritative in the session log. + pub async fn restore_from_pod_metadata( + pod_name: &str, + manifest: PodManifest, + store: St, + loader: PromptLoader, + ) -> Result { + let metadata = + store + .read_by_name(pod_name)? + .ok_or_else(|| PodError::PodMetadataMissing { + pod_name: pod_name.to_string(), + })?; + let active = metadata + .active + .ok_or_else(|| PodError::PodMetadataInactive { + pod_name: pod_name.to_string(), + })?; + let segment_id = active + .segment_id + .ok_or_else(|| PodError::PodMetadataPending { + pod_name: pod_name.to_string(), + session_id: active.session_id, + })?; + Self::restore_from_manifest(active.session_id, segment_id, manifest, store, loader).await + } + /// Restore a Pod from an existing session log. /// /// Resolves the manifest cascade exactly like [`Self::from_manifest`] @@ -3021,11 +3120,13 @@ impl Pod, St> { let extract_pointer = memory::extract::fold_pointer(&state.extensions); let task_store = tools::TaskStore::from_history(&state.history); + let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store)); let mut pod = Self { manifest, worker: Some(worker), store, + pod_metadata_writer, segment_state: SegmentState::new(session_id, segment_id, state.entries_count), pwd: common.pwd, scope: SharedScope::new(common.scope), @@ -3065,6 +3166,10 @@ impl Pod, St> { }; pod.apply_permissions_from_manifest(); pod.apply_prune_from_manifest(); + pod.write_pod_metadata_active(SegmentLocation { + session_id, + segment_id, + })?; drain_skill_shadows(&pod, skill_shadows); Ok(pod) } @@ -3293,6 +3398,20 @@ pub enum PodError { "session {segment_id} has no persisted scope snapshot; refusing resume without explicit scope" )] SegmentScopeMissing { segment_id: SegmentId }, + + #[error("pod metadata for {pod_name} was not found")] + PodMetadataMissing { pod_name: String }, + + #[error("pod metadata for {pod_name} has no active session")] + PodMetadataInactive { pod_name: String }, + + #[error( + "pod metadata for {pod_name} points to session {session_id} but no segment is materialized yet" + )] + PodMetadataPending { + pod_name: String, + session_id: SessionId, + }, } /// Bundle of resources that every high-level Pod constructor needs: diff --git a/crates/pod/tests/compact_events_test.rs b/crates/pod/tests/compact_events_test.rs index 102bba6a..7ff77687 100644 --- a/crates/pod/tests/compact_events_test.rs +++ b/crates/pod/tests/compact_events_test.rs @@ -17,7 +17,7 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve use llm_worker::llm_client::types::Item; use llm_worker::llm_client::{ClientError, LlmClient, Request}; use protocol::Event; -use session_store::{FsStore, LogEntry, Store}; +use session_store::{FsStore, LogEntry, PodMetadataStore, Store}; use tokio::sync::broadcast; use pod::Pod; @@ -158,7 +158,9 @@ async fn make_pod_with_manifest( std::mem::forget(pwd_tmp); let worker = Worker::new(client); - Pod::new(manifest, worker, store, pwd, scope).await.unwrap() + let mut pod = Pod::new(manifest, worker, store, pwd, scope).await.unwrap(); + pod.enable_pod_metadata_write_through().unwrap(); + pod } async fn make_pod(client: MockClient) -> Pod { @@ -213,6 +215,36 @@ fn system_texts_in_sink_session_start( Vec::new() } +/// Pod metadata starts with a reserved Session and no Segment, then becomes +/// active once the first SegmentStart is materialized by `run`. +#[tokio::test] +async fn pod_metadata_moves_from_pending_to_active_on_first_run() { + let client = MockClient::new(vec![single_text_events("hi")]); + let mut pod = make_pod(client).await; + let store = pod.store().clone(); + let session_id = pod.session_id(); + let initial_segment_id = pod.segment_id(); + + let pending = store + .read_by_name("test-pod") + .unwrap() + .expect("metadata should be initialized at Pod construction"); + assert_eq!(pending.pod_name, "test-pod"); + let pending_active = pending.active.expect("active session pointer missing"); + assert_eq!(pending_active.session_id, session_id); + assert_eq!(pending_active.segment_id, None); + + pod.run_text("first").await.unwrap(); + + let resolved = store + .read_by_name("test-pod") + .unwrap() + .expect("metadata should still exist after first run"); + let active = resolved.active.expect("active session pointer missing"); + assert_eq!(active.session_id, session_id); + assert_eq!(active.segment_id, Some(initial_segment_id)); +} + /// Live auto-fork: when another writer extends the segment behind the /// Pod's back, the next run's `ensure_segment_head` detects the /// entry-count drift and branches into a fresh segment **within the same @@ -274,6 +306,13 @@ permission = "write" let new_segment_id = pod.segment_id(); assert_ne!(new_segment_id, source_segment_id); assert_eq!(pod.session_id(), session_id, "auto-fork stays in-Session"); + let metadata = store + .read_by_name("test-pod") + .unwrap() + .expect("metadata should exist after auto-fork"); + let active = metadata.active.expect("active session pointer missing"); + assert_eq!(active.session_id, session_id); + assert_eq!(active.segment_id, Some(new_segment_id)); // New segment records forked_from pointing at the source. let new_entries = store.read_all(session_id, new_segment_id).unwrap(); @@ -312,7 +351,17 @@ async fn compact_emits_session_start_carrying_summary_and_task_snapshot() { pod.attach_event_tx(tx); pod.run_text("first").await.unwrap(); + let session_id = pod.session_id(); pod.compact(10_000).await.unwrap(); + let compacted_segment_id = pod.segment_id(); + let metadata = pod + .store() + .read_by_name("test-pod") + .unwrap() + .expect("metadata should exist after compaction"); + let active = metadata.active.expect("active session pointer missing"); + assert_eq!(active.session_id, session_id); + assert_eq!(active.segment_id, Some(compacted_segment_id)); let system_texts = system_texts_in_sink_session_start(&pod); // The post-compaction `SegmentStart.history` carries the new system diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index 9cc3a65c..0f5ea3b3 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -417,21 +417,23 @@ async fn events_are_broadcast() { #[tokio::test] async fn double_run_returns_error() { - // Create a client that streams slowly + // Keep the first turn in-flight until the test drops the handle. A + // finite stream can finish before the second Method reaches the + // controller in the full test suite, making this assertion racy. let events = vec![ LlmEvent::text_block_start(0), LlmEvent::text_delta(0, "slow..."), - // No stop/completed — the stream will end but without proper completion ]; - let client = MockClient::new(events); + let client = MockClient::sequential(vec![MockResponse::Hang(events)]); let pod = make_pod(client).await; let handle = spawn_controller(pod).await; let mut rx = handle.subscribe(); - // Send first run + // Send first run and wait until the controller has entered Running. handle.send(Method::run_text("first")).await.unwrap(); + wait_for_status(&handle, PodStatus::Running).await; - // Immediately send second run (should get error) + // Now the second run must be rejected by drive_turn's live Method arm. handle.send(Method::run_text("second")).await.unwrap(); // Look for the error event diff --git a/crates/pod/tests/restore_test.rs b/crates/pod/tests/restore_test.rs index c2ce782e..1dc97f37 100644 --- a/crates/pod/tests/restore_test.rs +++ b/crates/pod/tests/restore_test.rs @@ -8,7 +8,7 @@ use std::sync::{LazyLock, Mutex}; use pod::{Pod, PodError}; -use session_store::{FsStore, StoreError}; +use session_store::{FsStore, PodActiveSegmentRef, PodMetadata, PodMetadataStore, StoreError}; const MINIMAL_MANIFEST_TOML: &str = r#" [pod] @@ -31,6 +31,96 @@ permission = "write" /// pattern used by other integration tests in this crate. static ENV_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); +#[tokio::test] +async fn restore_from_pod_metadata_rejects_missing_metadata() { + let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); + + let store_tmp = tempfile::tempdir().unwrap(); + let store = FsStore::new(store_tmp.path()).unwrap(); + let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); + + let result = Pod::restore_from_pod_metadata( + "restore-test", + manifest, + store, + pod::PromptLoader::builtins_only(), + ) + .await; + + match result { + Err(PodError::PodMetadataMissing { pod_name }) => assert_eq!(pod_name, "restore-test"), + Err(other) => panic!("expected PodMetadataMissing, got {other:?}"), + Ok(_) => panic!("expected missing pod metadata to fail"), + } +} + +#[tokio::test] +async fn restore_from_pod_metadata_rejects_pending_segment() { + let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); + + let store_tmp = tempfile::tempdir().unwrap(); + let store = FsStore::new(store_tmp.path()).unwrap(); + let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); + let session_id = session_store::new_session_id(); + store + .write(&PodMetadata::new( + "restore-test", + Some(PodActiveSegmentRef::pending_segment(session_id)), + )) + .unwrap(); + + let result = Pod::restore_from_pod_metadata( + "restore-test", + manifest, + store, + pod::PromptLoader::builtins_only(), + ) + .await; + + match result { + Err(PodError::PodMetadataPending { + pod_name, + session_id: actual, + }) => { + assert_eq!(pod_name, "restore-test"); + assert_eq!(actual, session_id); + } + Err(other) => panic!("expected PodMetadataPending, got {other:?}"), + Ok(_) => panic!("expected pending pod metadata to fail"), + } +} + +#[tokio::test] +async fn restore_from_pod_metadata_resolves_active_pointer_through_session_log() { + let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); + + let store_tmp = tempfile::tempdir().unwrap(); + let store = FsStore::new(store_tmp.path()).unwrap(); + let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); + let session_id = session_store::new_session_id(); + let segment_id = session_store::new_segment_id(); + store + .write(&PodMetadata::new( + "restore-test", + Some(PodActiveSegmentRef::active_segment(session_id, segment_id)), + )) + .unwrap(); + + let result = Pod::restore_from_pod_metadata( + "restore-test", + manifest, + store, + pod::PromptLoader::builtins_only(), + ) + .await; + + match result { + Err(PodError::Store(StoreError::NotFound(id))) => assert_eq!(id, segment_id), + Err(other) => panic!("expected Store(NotFound) from resolved segment, got {other:?}"), + Ok(_) => panic!("expected unknown resolved segment to fail"), + } +} + #[tokio::test] async fn restore_from_manifest_rejects_unknown_segment() { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());