feat: add pod metadata store backend
This commit is contained in:
parent
67f135fbc6
commit
b13c2735bb
|
|
@ -3,6 +3,7 @@
|
||||||
//! Layout:
|
//! Layout:
|
||||||
//! - Segment log: `{root}/{session_id}/{segment_id}.jsonl`
|
//! - Segment log: `{root}/{session_id}/{segment_id}.jsonl`
|
||||||
//! - Event trace: `{root}/{session_id}/{segment_id}.trace.jsonl`
|
//! - Event trace: `{root}/{session_id}/{segment_id}.trace.jsonl`
|
||||||
|
//! - Pod metadata: `{root}/pods/{pod_name}/metadata.json`
|
||||||
//!
|
//!
|
||||||
//! The per-Session directory makes `list_segments(session_id)` an O(dir)
|
//! The per-Session directory makes `list_segments(session_id)` an O(dir)
|
||||||
//! scan and gives the fork tree a visible grouping in the filesystem.
|
//! scan and gives the fork tree a visible grouping in the filesystem.
|
||||||
|
|
@ -16,6 +17,7 @@
|
||||||
//! enumerable by the picker.
|
//! enumerable by the picker.
|
||||||
|
|
||||||
use crate::event_trace::TraceEntry;
|
use crate::event_trace::TraceEntry;
|
||||||
|
use crate::pod_metadata::{PodMetadata, PodMetadataStore, validate_pod_name};
|
||||||
use crate::segment_log::LogEntry;
|
use crate::segment_log::LogEntry;
|
||||||
use crate::store::{Store, StoreError};
|
use crate::store::{Store, StoreError};
|
||||||
use crate::{SegmentId, SessionId};
|
use crate::{SegmentId, SessionId};
|
||||||
|
|
@ -55,6 +57,19 @@ impl FsStore {
|
||||||
.join(format!("{segment_id}.trace.jsonl"))
|
.join(format!("{segment_id}.trace.jsonl"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn pods_dir(&self) -> PathBuf {
|
||||||
|
self.root.join("pods")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pod_dir(&self, pod_name: &str) -> Result<PathBuf, StoreError> {
|
||||||
|
validate_pod_name(pod_name)?;
|
||||||
|
Ok(self.pods_dir().join(pod_name))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pod_metadata_path(&self, pod_name: &str) -> Result<PathBuf, StoreError> {
|
||||||
|
Ok(self.pod_dir(pod_name)?.join("metadata.json"))
|
||||||
|
}
|
||||||
|
|
||||||
fn append_line(&self, path: &Path, line: &str) -> Result<(), StoreError> {
|
fn append_line(&self, path: &Path, line: &str) -> Result<(), StoreError> {
|
||||||
if let Some(parent) = path.parent() {
|
if let Some(parent) = path.parent() {
|
||||||
fs::create_dir_all(parent)?;
|
fs::create_dir_all(parent)?;
|
||||||
|
|
@ -84,6 +99,41 @@ impl FsStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl PodMetadataStore for FsStore {
|
||||||
|
fn write(&self, metadata: &PodMetadata) -> Result<(), StoreError> {
|
||||||
|
let path = self.pod_metadata_path(&metadata.pod_name)?;
|
||||||
|
if let Some(parent) = path.parent() {
|
||||||
|
fs::create_dir_all(parent)?;
|
||||||
|
}
|
||||||
|
let content = serde_json::to_vec_pretty(metadata)?;
|
||||||
|
fs::write(path, content)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_by_name(&self, pod_name: &str) -> Result<Option<PodMetadata>, StoreError> {
|
||||||
|
let path = self.pod_metadata_path(pod_name)?;
|
||||||
|
let content = match fs::read_to_string(path) {
|
||||||
|
Ok(content) => content,
|
||||||
|
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
|
||||||
|
Err(err) => return Err(StoreError::Io(err)),
|
||||||
|
};
|
||||||
|
Ok(Some(serde_json::from_str(&content)?))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError> {
|
||||||
|
let path = self.pod_metadata_path(pod_name)?;
|
||||||
|
match fs::remove_file(&path) {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
|
||||||
|
Err(err) => return Err(StoreError::Io(err)),
|
||||||
|
}
|
||||||
|
if let Some(parent) = path.parent() {
|
||||||
|
let _ = fs::remove_dir(parent);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Store for FsStore {
|
impl Store for FsStore {
|
||||||
fn append(
|
fn append(
|
||||||
&self,
|
&self,
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,7 @@
|
||||||
pub mod event_trace;
|
pub mod event_trace;
|
||||||
pub mod fs_store;
|
pub mod fs_store;
|
||||||
pub mod logged_item;
|
pub mod logged_item;
|
||||||
|
pub mod pod_metadata;
|
||||||
pub mod segment;
|
pub mod segment;
|
||||||
pub mod segment_log;
|
pub mod segment_log;
|
||||||
pub mod store;
|
pub mod store;
|
||||||
|
|
@ -43,6 +44,7 @@ pub use fs_store::FsStore;
|
||||||
pub use llm_worker::UsageRecord;
|
pub use llm_worker::UsageRecord;
|
||||||
pub use llm_worker::llm_client::types::{ContentPart, Item, Role};
|
pub use llm_worker::llm_client::types::{ContentPart, Item, Role};
|
||||||
pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged};
|
pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged};
|
||||||
|
pub use pod_metadata::{PodActiveSegmentRef, PodMetadata, PodMetadataStore};
|
||||||
pub use segment::{
|
pub use segment::{
|
||||||
SegmentStartState, append_entry, append_system_item, classify_history_item,
|
SegmentStartState, append_entry, append_system_item, classify_history_item,
|
||||||
create_compacted_segment, create_segment, create_segment_with_ids, ensure_head_or_fork, fork,
|
create_compacted_segment, create_segment, create_segment_with_ids, ensure_head_or_fork, fork,
|
||||||
|
|
|
||||||
85
crates/session-store/src/pod_metadata.rs
Normal file
85
crates/session-store/src/pod_metadata.rs
Normal file
|
|
@ -0,0 +1,85 @@
|
||||||
|
//! Pod metadata persistence API.
|
||||||
|
//!
|
||||||
|
//! Pod metadata is a lightweight name-keyed pointer to the Session/Segment
|
||||||
|
//! currently active for a Pod. Conversation content remains in the segment log;
|
||||||
|
//! this metadata only records references needed by Pod-name resume/attach flows.
|
||||||
|
|
||||||
|
use crate::store::StoreError;
|
||||||
|
use crate::{SegmentId, SessionId};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// Active Session/Segment pointer for a Pod.
|
||||||
|
///
|
||||||
|
/// `segment_id` is optional so callers can persist a reserved Session before
|
||||||
|
/// the first Segment ID is known. Once a segment exists, callers should rewrite
|
||||||
|
/// the metadata with `Some(segment_id)`.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct PodActiveSegmentRef {
|
||||||
|
pub session_id: SessionId,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub segment_id: Option<SegmentId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PodActiveSegmentRef {
|
||||||
|
/// Create a reference whose active Segment is not known yet.
|
||||||
|
pub fn pending_segment(session_id: SessionId) -> Self {
|
||||||
|
Self {
|
||||||
|
session_id,
|
||||||
|
segment_id: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a fully resolved active Session/Segment reference.
|
||||||
|
pub fn active_segment(session_id: SessionId, segment_id: SegmentId) -> Self {
|
||||||
|
Self {
|
||||||
|
session_id,
|
||||||
|
segment_id: Some(segment_id),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Persistent metadata for a Pod name.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct PodMetadata {
|
||||||
|
pub pod_name: String,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub active: Option<PodActiveSegmentRef>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PodMetadata {
|
||||||
|
/// Create Pod metadata for `pod_name`.
|
||||||
|
pub fn new(pod_name: impl Into<String>, active: Option<PodActiveSegmentRef>) -> Self {
|
||||||
|
Self {
|
||||||
|
pod_name: pod_name.into(),
|
||||||
|
active,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sync persistence backend for Pod metadata.
|
||||||
|
///
|
||||||
|
/// The key is the Pod name. Missing state is not an error: `read_by_name`
|
||||||
|
/// returns `Ok(None)` for Pods that have never persisted metadata or whose
|
||||||
|
/// metadata was deleted.
|
||||||
|
pub trait PodMetadataStore: Send + Sync {
|
||||||
|
/// Create or replace metadata for its `pod_name` key.
|
||||||
|
fn write(&self, metadata: &PodMetadata) -> Result<(), StoreError>;
|
||||||
|
|
||||||
|
/// Read metadata by Pod name. Returns `None` when no metadata exists.
|
||||||
|
fn read_by_name(&self, pod_name: &str) -> Result<Option<PodMetadata>, StoreError>;
|
||||||
|
|
||||||
|
/// Delete metadata by Pod name. Missing metadata is a successful no-op.
|
||||||
|
fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn validate_pod_name(pod_name: &str) -> Result<(), StoreError> {
|
||||||
|
if pod_name.is_empty()
|
||||||
|
|| pod_name == "."
|
||||||
|
|| pod_name == ".."
|
||||||
|
|| pod_name.contains('/')
|
||||||
|
|| pod_name.contains('\0')
|
||||||
|
{
|
||||||
|
return Err(StoreError::InvalidPodName(pod_name.to_string()));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
@ -29,6 +29,9 @@ pub enum StoreError {
|
||||||
|
|
||||||
#[error("log corrupted at line {line}: {message}")]
|
#[error("log corrupted at line {line}: {message}")]
|
||||||
Corrupt { line: usize, message: String },
|
Corrupt { line: usize, message: String },
|
||||||
|
|
||||||
|
#[error("invalid pod name: {0}")]
|
||||||
|
InvalidPodName(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sync persistence backend for segment logs.
|
/// Sync persistence backend for segment logs.
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,8 @@
|
||||||
use llm_worker::WorkerResult;
|
use llm_worker::WorkerResult;
|
||||||
use llm_worker::llm_client::types::{Item, RequestConfig};
|
use llm_worker::llm_client::types::{Item, RequestConfig};
|
||||||
use session_store::{
|
use session_store::{
|
||||||
FsStore, LogEntry, Store, TraceEntry, collect_state, new_segment_id, new_session_id,
|
FsStore, LogEntry, PodActiveSegmentRef, PodMetadata, PodMetadataStore, Store, TraceEntry,
|
||||||
|
collect_state, new_segment_id, new_session_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
fn nil_session_start(ts: u64, session_id: uuid::Uuid) -> LogEntry {
|
fn nil_session_start(ts: u64, session_id: uuid::Uuid) -> LogEntry {
|
||||||
|
|
@ -224,3 +225,39 @@ fn lookup_session_of_finds_owning_session() {
|
||||||
|
|
||||||
assert_eq!(store.lookup_session_of(segid).unwrap(), Some(sid));
|
assert_eq!(store.lookup_session_of(segid).unwrap(), Some(sid));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn pod_metadata_minimal_crud() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let store = FsStore::new(dir.path()).unwrap();
|
||||||
|
let pod_name = "worker-a";
|
||||||
|
let sid = new_session_id();
|
||||||
|
let segid = new_segment_id();
|
||||||
|
|
||||||
|
assert_eq!(store.read_by_name(pod_name).unwrap(), None);
|
||||||
|
|
||||||
|
let pending = PodMetadata::new(pod_name, Some(PodActiveSegmentRef::pending_segment(sid)));
|
||||||
|
store.write(&pending).unwrap();
|
||||||
|
assert_eq!(store.read_by_name(pod_name).unwrap(), Some(pending.clone()));
|
||||||
|
assert!(
|
||||||
|
dir.path()
|
||||||
|
.join("pods")
|
||||||
|
.join(pod_name)
|
||||||
|
.join("metadata.json")
|
||||||
|
.exists(),
|
||||||
|
"Pod metadata must live under <data_dir>/pods/<pod_name>/"
|
||||||
|
);
|
||||||
|
|
||||||
|
let resolved = PodMetadata::new(
|
||||||
|
pod_name,
|
||||||
|
Some(PodActiveSegmentRef::active_segment(sid, segid)),
|
||||||
|
);
|
||||||
|
store.write(&resolved).unwrap();
|
||||||
|
assert_eq!(store.read_by_name(pod_name).unwrap(), Some(resolved));
|
||||||
|
|
||||||
|
store.delete_by_name(pod_name).unwrap();
|
||||||
|
assert_eq!(store.read_by_name(pod_name).unwrap(), None);
|
||||||
|
|
||||||
|
// Delete is idempotent for missing metadata.
|
||||||
|
store.delete_by_name(pod_name).unwrap();
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user