diff --git a/crates/session-store/src/fs_store.rs b/crates/session-store/src/fs_store.rs index a689f902..7a8738ed 100644 --- a/crates/session-store/src/fs_store.rs +++ b/crates/session-store/src/fs_store.rs @@ -3,6 +3,7 @@ //! Layout: //! - Segment log: `{root}/{session_id}/{segment_id}.jsonl` //! - Event trace: `{root}/{session_id}/{segment_id}.trace.jsonl` +//! - Pod metadata: `{root}/pods/{pod_name}/metadata.json` //! //! The per-Session directory makes `list_segments(session_id)` an O(dir) //! scan and gives the fork tree a visible grouping in the filesystem. @@ -16,6 +17,7 @@ //! enumerable by the picker. use crate::event_trace::TraceEntry; +use crate::pod_metadata::{PodMetadata, PodMetadataStore, validate_pod_name}; use crate::segment_log::LogEntry; use crate::store::{Store, StoreError}; use crate::{SegmentId, SessionId}; @@ -55,6 +57,19 @@ impl FsStore { .join(format!("{segment_id}.trace.jsonl")) } + fn pods_dir(&self) -> PathBuf { + self.root.join("pods") + } + + fn pod_dir(&self, pod_name: &str) -> Result { + validate_pod_name(pod_name)?; + Ok(self.pods_dir().join(pod_name)) + } + + fn pod_metadata_path(&self, pod_name: &str) -> Result { + Ok(self.pod_dir(pod_name)?.join("metadata.json")) + } + fn append_line(&self, path: &Path, line: &str) -> Result<(), StoreError> { if let Some(parent) = path.parent() { fs::create_dir_all(parent)?; @@ -84,6 +99,41 @@ impl FsStore { } } +impl PodMetadataStore for FsStore { + fn write(&self, metadata: &PodMetadata) -> Result<(), StoreError> { + let path = self.pod_metadata_path(&metadata.pod_name)?; + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + let content = serde_json::to_vec_pretty(metadata)?; + fs::write(path, content)?; + Ok(()) + } + + fn read_by_name(&self, pod_name: &str) -> Result, StoreError> { + let path = self.pod_metadata_path(pod_name)?; + let content = match fs::read_to_string(path) { + Ok(content) => content, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), + Err(err) => return Err(StoreError::Io(err)), + }; + Ok(Some(serde_json::from_str(&content)?)) + } + + fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError> { + let path = self.pod_metadata_path(pod_name)?; + match fs::remove_file(&path) { + Ok(()) => {} + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()), + Err(err) => return Err(StoreError::Io(err)), + } + if let Some(parent) = path.parent() { + let _ = fs::remove_dir(parent); + } + Ok(()) + } +} + impl Store for FsStore { fn append( &self, diff --git a/crates/session-store/src/lib.rs b/crates/session-store/src/lib.rs index d2754081..c92608a7 100644 --- a/crates/session-store/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -33,6 +33,7 @@ pub mod event_trace; pub mod fs_store; pub mod logged_item; +pub mod pod_metadata; pub mod segment; pub mod segment_log; pub mod store; @@ -43,6 +44,7 @@ pub use fs_store::FsStore; pub use llm_worker::UsageRecord; pub use llm_worker::llm_client::types::{ContentPart, Item, Role}; pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged}; +pub use pod_metadata::{PodActiveSegmentRef, PodMetadata, PodMetadataStore}; pub use segment::{ SegmentStartState, append_entry, append_system_item, classify_history_item, create_compacted_segment, create_segment, create_segment_with_ids, ensure_head_or_fork, fork, diff --git a/crates/session-store/src/pod_metadata.rs b/crates/session-store/src/pod_metadata.rs new file mode 100644 index 00000000..b031b06d --- /dev/null +++ b/crates/session-store/src/pod_metadata.rs @@ -0,0 +1,85 @@ +//! Pod metadata persistence API. +//! +//! Pod metadata is a lightweight name-keyed pointer to the Session/Segment +//! currently active for a Pod. Conversation content remains in the segment log; +//! this metadata only records references needed by Pod-name resume/attach flows. + +use crate::store::StoreError; +use crate::{SegmentId, SessionId}; +use serde::{Deserialize, Serialize}; + +/// Active Session/Segment pointer for a Pod. +/// +/// `segment_id` is optional so callers can persist a reserved Session before +/// the first Segment ID is known. Once a segment exists, callers should rewrite +/// the metadata with `Some(segment_id)`. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PodActiveSegmentRef { + pub session_id: SessionId, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub segment_id: Option, +} + +impl PodActiveSegmentRef { + /// Create a reference whose active Segment is not known yet. + pub fn pending_segment(session_id: SessionId) -> Self { + Self { + session_id, + segment_id: None, + } + } + + /// Create a fully resolved active Session/Segment reference. + pub fn active_segment(session_id: SessionId, segment_id: SegmentId) -> Self { + Self { + session_id, + segment_id: Some(segment_id), + } + } +} + +/// Persistent metadata for a Pod name. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PodMetadata { + pub pod_name: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub active: Option, +} + +impl PodMetadata { + /// Create Pod metadata for `pod_name`. + pub fn new(pod_name: impl Into, active: Option) -> Self { + Self { + pod_name: pod_name.into(), + active, + } + } +} + +/// Sync persistence backend for Pod metadata. +/// +/// The key is the Pod name. Missing state is not an error: `read_by_name` +/// returns `Ok(None)` for Pods that have never persisted metadata or whose +/// metadata was deleted. +pub trait PodMetadataStore: Send + Sync { + /// Create or replace metadata for its `pod_name` key. + fn write(&self, metadata: &PodMetadata) -> Result<(), StoreError>; + + /// Read metadata by Pod name. Returns `None` when no metadata exists. + fn read_by_name(&self, pod_name: &str) -> Result, StoreError>; + + /// Delete metadata by Pod name. Missing metadata is a successful no-op. + fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError>; +} + +pub(crate) fn validate_pod_name(pod_name: &str) -> Result<(), StoreError> { + if pod_name.is_empty() + || pod_name == "." + || pod_name == ".." + || pod_name.contains('/') + || pod_name.contains('\0') + { + return Err(StoreError::InvalidPodName(pod_name.to_string())); + } + Ok(()) +} diff --git a/crates/session-store/src/store.rs b/crates/session-store/src/store.rs index 0896395d..7e181794 100644 --- a/crates/session-store/src/store.rs +++ b/crates/session-store/src/store.rs @@ -29,6 +29,9 @@ pub enum StoreError { #[error("log corrupted at line {line}: {message}")] Corrupt { line: usize, message: String }, + + #[error("invalid pod name: {0}")] + InvalidPodName(String), } /// Sync persistence backend for segment logs. diff --git a/crates/session-store/tests/fs_store_test.rs b/crates/session-store/tests/fs_store_test.rs index 6cb60963..5c817f8d 100644 --- a/crates/session-store/tests/fs_store_test.rs +++ b/crates/session-store/tests/fs_store_test.rs @@ -1,7 +1,8 @@ use llm_worker::WorkerResult; use llm_worker::llm_client::types::{Item, RequestConfig}; use session_store::{ - FsStore, LogEntry, Store, TraceEntry, collect_state, new_segment_id, new_session_id, + FsStore, LogEntry, PodActiveSegmentRef, PodMetadata, PodMetadataStore, Store, TraceEntry, + collect_state, new_segment_id, new_session_id, }; fn nil_session_start(ts: u64, session_id: uuid::Uuid) -> LogEntry { @@ -224,3 +225,39 @@ fn lookup_session_of_finds_owning_session() { assert_eq!(store.lookup_session_of(segid).unwrap(), Some(sid)); } + +#[test] +fn pod_metadata_minimal_crud() { + let dir = tempfile::tempdir().unwrap(); + let store = FsStore::new(dir.path()).unwrap(); + let pod_name = "worker-a"; + let sid = new_session_id(); + let segid = new_segment_id(); + + assert_eq!(store.read_by_name(pod_name).unwrap(), None); + + let pending = PodMetadata::new(pod_name, Some(PodActiveSegmentRef::pending_segment(sid))); + store.write(&pending).unwrap(); + assert_eq!(store.read_by_name(pod_name).unwrap(), Some(pending.clone())); + assert!( + dir.path() + .join("pods") + .join(pod_name) + .join("metadata.json") + .exists(), + "Pod metadata must live under /pods//" + ); + + let resolved = PodMetadata::new( + pod_name, + Some(PodActiveSegmentRef::active_segment(sid, segid)), + ); + store.write(&resolved).unwrap(); + assert_eq!(store.read_by_name(pod_name).unwrap(), Some(resolved)); + + store.delete_by_name(pod_name).unwrap(); + assert_eq!(store.read_by_name(pod_name).unwrap(), None); + + // Delete is idempotent for missing metadata. + store.delete_by_name(pod_name).unwrap(); +}