feat: wire pod metadata lifecycle writes

This commit is contained in:
Keisuke Hirata 2026-05-22 22:29:08 +09:00
parent e9cc4b90dc
commit 0954a4804a
No known key found for this signature in database
4 changed files with 271 additions and 11 deletions

View File

@ -9,8 +9,8 @@ use llm_worker::llm_client::client::LlmClient;
use llm_worker::state::Mutable;
use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult};
use session_store::{
LogEntry, PodScopeSnapshot, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log,
to_logged,
LogEntry, PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodScopeSnapshot, SegmentId,
SessionId, Store, StoreError, SystemItem, segment_log, to_logged,
};
use tracing::{info, warn};
@ -50,6 +50,16 @@ pub struct SegmentLocation {
pub segment_id: SegmentId,
}
type PodMetadataWriter = Arc<dyn Fn(PodMetadata) -> Result<(), StoreError> + Send + Sync>;
fn pod_metadata_writer_for_store<St>(store: &St) -> PodMetadataWriter
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
let store = store.clone();
Arc::new(move |metadata| store.write(&metadata))
}
/// Lock-free shared session/segment pointer.
///
/// Holds the current `(SessionId, SegmentId)` pair and the append tally
@ -181,6 +191,10 @@ pub struct Pod<C: LlmClient, St: Store> {
/// Always `Some` outside of `run()`/`resume()`.
worker: Option<Worker<C, Mutable>>,
store: St,
/// Optional write-through hook for name-keyed Pod metadata. Production
/// constructors install this from the same FsStore that owns the session
/// logs; low-level `Pod::new` tests leave it absent.
pod_metadata_writer: Option<PodMetadataWriter>,
/// Shared session pointer. Source of truth for the Pod's current
/// `segment_id` and append tally. `self.segment_id()` is a thin
/// wrapper over `segment_state.segment_id()`.
@ -358,6 +372,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
manifest: self.manifest.clone(),
worker: Some(worker),
store: self.store.clone(),
pod_metadata_writer: None,
segment_state: self.segment_state.clone(),
pwd: self.pwd.clone(),
scope: self.scope.clone(),
@ -499,6 +514,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
manifest,
worker: Some(worker),
store,
pod_metadata_writer: None,
segment_state: SegmentState::new(session_id, segment_id, 0),
pwd,
scope: SharedScope::new(scope),
@ -716,6 +732,41 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
&self.store
}
fn write_pod_metadata_pending(&self) -> Result<(), StoreError> {
let Some(writer) = &self.pod_metadata_writer else {
return Ok(());
};
writer(PodMetadata::new(
self.manifest.pod.name.clone(),
Some(PodActiveSegmentRef::pending_segment(self.session_id())),
))
}
fn write_pod_metadata_active(&self, loc: SegmentLocation) -> Result<(), StoreError> {
let Some(writer) = &self.pod_metadata_writer else {
return Ok(());
};
writer(PodMetadata::new(
self.manifest.pod.name.clone(),
Some(PodActiveSegmentRef::active_segment(
loc.session_id,
loc.segment_id,
)),
))
}
/// Enable name-keyed Pod metadata write-through for Pods built through
/// the low-level constructor. High-level manifest constructors enable it
/// automatically; this hook lets tests and custom embedders opt into the
/// same persistence behavior without changing `Pod::new`'s minimal bounds.
pub fn enable_pod_metadata_write_through(&mut self) -> Result<(), StoreError>
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
self.pod_metadata_writer = Some(pod_metadata_writer_for_store(&self.store));
self.write_pod_metadata_pending()
}
/// Current history items held by the underlying Worker.
pub fn history(&self) -> &[Item] {
self.worker().history()
@ -1660,6 +1711,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
};
self.commit_entry(initial)?;
self.persist_scope_snapshot()?;
self.write_pod_metadata_active(loc)?;
return Ok(());
}
// Check store count + auto-fork if it drifted.
@ -1703,6 +1755,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
if self.scope_allocation.is_some() {
pod_registry::update_segment(&self.manifest.pod.name, fork_segment_id)?;
}
self.write_pod_metadata_active(SegmentLocation {
session_id: loc.session_id,
segment_id: fork_segment_id,
})?;
Ok(())
}
@ -2220,6 +2276,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
if self.scope_allocation.is_some() {
pod_registry::update_segment(&self.manifest.pod.name, new_segment_id)?;
}
self.write_pod_metadata_active(SegmentLocation {
session_id: old_loc.session_id,
segment_id: new_segment_id,
})?;
// Align user_segments with the post-compaction history. Items
// before `retain_from` (now folded into the summary) lose their
// segments; only the user_messages surviving in retained_items
@ -2760,7 +2820,10 @@ enum ConsolidateDecision {
Completed,
}
impl<St: Store> Pod<Box<dyn LlmClient>, St> {
impl<St> Pod<Box<dyn LlmClient>, St>
where
St: Store + PodMetadataStore + Clone + Send + Sync + 'static,
{
/// Create a Pod entirely from a validated manifest.
///
/// The Pod's working directory is captured once here from the
@ -2808,11 +2871,13 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
let mut worker = Worker::new(common.client);
apply_worker_manifest(&mut worker, &manifest.worker);
worker.set_cache_key(Some(segment_id.to_string()));
let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store));
let mut pod = Self {
manifest,
worker: Some(worker),
store,
pod_metadata_writer,
segment_state: SegmentState::new(session_id, segment_id, 0),
pwd: common.pwd,
scope: SharedScope::new(common.scope),
@ -2847,6 +2912,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
};
pod.apply_permissions_from_manifest();
pod.apply_prune_from_manifest();
pod.write_pod_metadata_pending()?;
drain_skill_shadows(&pod, skill_shadows);
Ok(pod)
}
@ -2881,11 +2947,13 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
let mut worker = Worker::new(common.client);
apply_worker_manifest(&mut worker, &manifest.worker);
worker.set_cache_key(Some(segment_id.to_string()));
let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store));
let mut pod = Self {
manifest,
worker: Some(worker),
store,
pod_metadata_writer,
segment_state: SegmentState::new(session_id, segment_id, 0),
pwd: common.pwd,
scope: SharedScope::new(common.scope),
@ -2920,10 +2988,41 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
};
pod.apply_permissions_from_manifest();
pod.apply_prune_from_manifest();
pod.write_pod_metadata_pending()?;
drain_skill_shadows(&pod, skill_shadows);
Ok(pod)
}
/// Restore a Pod by resolving its name-keyed metadata to an active
/// `(SessionId, SegmentId)` and then using the normal session-log restore
/// path. The metadata stores only the active pointer; lineage and origin
/// remain authoritative in the session log.
pub async fn restore_from_pod_metadata(
pod_name: &str,
manifest: PodManifest,
store: St,
loader: PromptLoader,
) -> Result<Self, PodError> {
let metadata =
store
.read_by_name(pod_name)?
.ok_or_else(|| PodError::PodMetadataMissing {
pod_name: pod_name.to_string(),
})?;
let active = metadata
.active
.ok_or_else(|| PodError::PodMetadataInactive {
pod_name: pod_name.to_string(),
})?;
let segment_id = active
.segment_id
.ok_or_else(|| PodError::PodMetadataPending {
pod_name: pod_name.to_string(),
session_id: active.session_id,
})?;
Self::restore_from_manifest(active.session_id, segment_id, manifest, store, loader).await
}
/// Restore a Pod from an existing session log.
///
/// Resolves the manifest cascade exactly like [`Self::from_manifest`]
@ -3021,11 +3120,13 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
let extract_pointer = memory::extract::fold_pointer(&state.extensions);
let task_store = tools::TaskStore::from_history(&state.history);
let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store));
let mut pod = Self {
manifest,
worker: Some(worker),
store,
pod_metadata_writer,
segment_state: SegmentState::new(session_id, segment_id, state.entries_count),
pwd: common.pwd,
scope: SharedScope::new(common.scope),
@ -3065,6 +3166,10 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
};
pod.apply_permissions_from_manifest();
pod.apply_prune_from_manifest();
pod.write_pod_metadata_active(SegmentLocation {
session_id,
segment_id,
})?;
drain_skill_shadows(&pod, skill_shadows);
Ok(pod)
}
@ -3293,6 +3398,20 @@ pub enum PodError {
"session {segment_id} has no persisted scope snapshot; refusing resume without explicit scope"
)]
SegmentScopeMissing { segment_id: SegmentId },
#[error("pod metadata for {pod_name} was not found")]
PodMetadataMissing { pod_name: String },
#[error("pod metadata for {pod_name} has no active session")]
PodMetadataInactive { pod_name: String },
#[error(
"pod metadata for {pod_name} points to session {session_id} but no segment is materialized yet"
)]
PodMetadataPending {
pod_name: String,
session_id: SessionId,
},
}
/// Bundle of resources that every high-level Pod constructor needs:

View File

@ -17,7 +17,7 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve
use llm_worker::llm_client::types::Item;
use llm_worker::llm_client::{ClientError, LlmClient, Request};
use protocol::Event;
use session_store::{FsStore, LogEntry, Store};
use session_store::{FsStore, LogEntry, PodMetadataStore, Store};
use tokio::sync::broadcast;
use pod::Pod;
@ -158,7 +158,9 @@ async fn make_pod_with_manifest(
std::mem::forget(pwd_tmp);
let worker = Worker::new(client);
Pod::new(manifest, worker, store, pwd, scope).await.unwrap()
let mut pod = Pod::new(manifest, worker, store, pwd, scope).await.unwrap();
pod.enable_pod_metadata_write_through().unwrap();
pod
}
async fn make_pod(client: MockClient) -> Pod<MockClient, FsStore> {
@ -213,6 +215,36 @@ fn system_texts_in_sink_session_start(
Vec::new()
}
/// Pod metadata starts with a reserved Session and no Segment, then becomes
/// active once the first SegmentStart is materialized by `run`.
#[tokio::test]
async fn pod_metadata_moves_from_pending_to_active_on_first_run() {
let client = MockClient::new(vec![single_text_events("hi")]);
let mut pod = make_pod(client).await;
let store = pod.store().clone();
let session_id = pod.session_id();
let initial_segment_id = pod.segment_id();
let pending = store
.read_by_name("test-pod")
.unwrap()
.expect("metadata should be initialized at Pod construction");
assert_eq!(pending.pod_name, "test-pod");
let pending_active = pending.active.expect("active session pointer missing");
assert_eq!(pending_active.session_id, session_id);
assert_eq!(pending_active.segment_id, None);
pod.run_text("first").await.unwrap();
let resolved = store
.read_by_name("test-pod")
.unwrap()
.expect("metadata should still exist after first run");
let active = resolved.active.expect("active session pointer missing");
assert_eq!(active.session_id, session_id);
assert_eq!(active.segment_id, Some(initial_segment_id));
}
/// Live auto-fork: when another writer extends the segment behind the
/// Pod's back, the next run's `ensure_segment_head` detects the
/// entry-count drift and branches into a fresh segment **within the same
@ -274,6 +306,13 @@ permission = "write"
let new_segment_id = pod.segment_id();
assert_ne!(new_segment_id, source_segment_id);
assert_eq!(pod.session_id(), session_id, "auto-fork stays in-Session");
let metadata = store
.read_by_name("test-pod")
.unwrap()
.expect("metadata should exist after auto-fork");
let active = metadata.active.expect("active session pointer missing");
assert_eq!(active.session_id, session_id);
assert_eq!(active.segment_id, Some(new_segment_id));
// New segment records forked_from pointing at the source.
let new_entries = store.read_all(session_id, new_segment_id).unwrap();
@ -312,7 +351,17 @@ async fn compact_emits_session_start_carrying_summary_and_task_snapshot() {
pod.attach_event_tx(tx);
pod.run_text("first").await.unwrap();
let session_id = pod.session_id();
pod.compact(10_000).await.unwrap();
let compacted_segment_id = pod.segment_id();
let metadata = pod
.store()
.read_by_name("test-pod")
.unwrap()
.expect("metadata should exist after compaction");
let active = metadata.active.expect("active session pointer missing");
assert_eq!(active.session_id, session_id);
assert_eq!(active.segment_id, Some(compacted_segment_id));
let system_texts = system_texts_in_sink_session_start(&pod);
// The post-compaction `SegmentStart.history` carries the new system

View File

@ -417,21 +417,23 @@ async fn events_are_broadcast() {
#[tokio::test]
async fn double_run_returns_error() {
// Create a client that streams slowly
// Keep the first turn in-flight until the test drops the handle. A
// finite stream can finish before the second Method reaches the
// controller in the full test suite, making this assertion racy.
let events = vec![
LlmEvent::text_block_start(0),
LlmEvent::text_delta(0, "slow..."),
// No stop/completed — the stream will end but without proper completion
];
let client = MockClient::new(events);
let client = MockClient::sequential(vec![MockResponse::Hang(events)]);
let pod = make_pod(client).await;
let handle = spawn_controller(pod).await;
let mut rx = handle.subscribe();
// Send first run
// Send first run and wait until the controller has entered Running.
handle.send(Method::run_text("first")).await.unwrap();
wait_for_status(&handle, PodStatus::Running).await;
// Immediately send second run (should get error)
// Now the second run must be rejected by drive_turn's live Method arm.
handle.send(Method::run_text("second")).await.unwrap();
// Look for the error event

View File

@ -8,7 +8,7 @@
use std::sync::{LazyLock, Mutex};
use pod::{Pod, PodError};
use session_store::{FsStore, StoreError};
use session_store::{FsStore, PodActiveSegmentRef, PodMetadata, PodMetadataStore, StoreError};
const MINIMAL_MANIFEST_TOML: &str = r#"
[pod]
@ -31,6 +31,96 @@ permission = "write"
/// pattern used by other integration tests in this crate.
static ENV_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
#[tokio::test]
async fn restore_from_pod_metadata_rejects_missing_metadata() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap();
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
let result = Pod::restore_from_pod_metadata(
"restore-test",
manifest,
store,
pod::PromptLoader::builtins_only(),
)
.await;
match result {
Err(PodError::PodMetadataMissing { pod_name }) => assert_eq!(pod_name, "restore-test"),
Err(other) => panic!("expected PodMetadataMissing, got {other:?}"),
Ok(_) => panic!("expected missing pod metadata to fail"),
}
}
#[tokio::test]
async fn restore_from_pod_metadata_rejects_pending_segment() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap();
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
let session_id = session_store::new_session_id();
store
.write(&PodMetadata::new(
"restore-test",
Some(PodActiveSegmentRef::pending_segment(session_id)),
))
.unwrap();
let result = Pod::restore_from_pod_metadata(
"restore-test",
manifest,
store,
pod::PromptLoader::builtins_only(),
)
.await;
match result {
Err(PodError::PodMetadataPending {
pod_name,
session_id: actual,
}) => {
assert_eq!(pod_name, "restore-test");
assert_eq!(actual, session_id);
}
Err(other) => panic!("expected PodMetadataPending, got {other:?}"),
Ok(_) => panic!("expected pending pod metadata to fail"),
}
}
#[tokio::test]
async fn restore_from_pod_metadata_resolves_active_pointer_through_session_log() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap();
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
let session_id = session_store::new_session_id();
let segment_id = session_store::new_segment_id();
store
.write(&PodMetadata::new(
"restore-test",
Some(PodActiveSegmentRef::active_segment(session_id, segment_id)),
))
.unwrap();
let result = Pod::restore_from_pod_metadata(
"restore-test",
manifest,
store,
pod::PromptLoader::builtins_only(),
)
.await;
match result {
Err(PodError::Store(StoreError::NotFound(id))) => assert_eq!(id, segment_id),
Err(other) => panic!("expected Store(NotFound) from resolved segment, got {other:?}"),
Ok(_) => panic!("expected unknown resolved segment to fail"),
}
}
#[tokio::test]
async fn restore_from_manifest_rejects_unknown_segment() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());