merge: old pod crate cleanup

This commit is contained in:
Keisuke Hirata 2026-06-29 05:04:59 +09:00
commit 83d433bf7e
No known key found for this signature in database
57 changed files with 432 additions and 493 deletions

31
Cargo.lock generated
View File

@ -2880,31 +2880,6 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6"
[[package]]
name = "pod-registry"
version = "0.1.0"
dependencies = [
"fs4",
"libc",
"manifest",
"serde",
"serde_json",
"session-store",
"tempfile",
"thiserror 2.0.18",
]
[[package]]
name = "pod-store"
version = "0.1.0"
dependencies = [
"serde",
"serde_json",
"session-store",
"tempfile",
"thiserror 2.0.18",
]
[[package]] [[package]]
name = "pom" name = "pom"
version = "1.1.0" version = "1.1.0"
@ -4688,11 +4663,10 @@ dependencies = [
"base64", "base64",
"client", "client",
"crossterm 0.28.1", "crossterm 0.28.1",
"fs4",
"llm-engine", "llm-engine",
"manifest", "manifest",
"minijinja", "minijinja",
"pod-registry",
"pod-store",
"protocol", "protocol",
"provider", "provider",
"pulldown-cmark", "pulldown-cmark",
@ -5910,8 +5884,6 @@ dependencies = [
"mcp", "mcp",
"memory", "memory",
"minijinja", "minijinja",
"pod-registry",
"pod-store",
"protocol", "protocol",
"provider", "provider",
"reqwest", "reqwest",
@ -5994,7 +5966,6 @@ dependencies = [
"client", "client",
"manifest", "manifest",
"memory", "memory",
"pod-store",
"project-record", "project-record",
"serde", "serde",
"serde_json", "serde_json",

View File

@ -12,10 +12,8 @@ members = [
"crates/worker-runtime", "crates/worker-runtime",
"crates/plugin-pdk", "crates/plugin-pdk",
"crates/yoi", "crates/yoi",
"crates/pod-store",
"crates/protocol", "crates/protocol",
"crates/provider", "crates/provider",
"crates/pod-registry",
"crates/session-metrics", "crates/session-metrics",
"crates/session-analytics", "crates/session-analytics",
"crates/lint-common", "crates/lint-common",
@ -40,10 +38,8 @@ default-members = [
"crates/worker-runtime", "crates/worker-runtime",
"crates/plugin-pdk", "crates/plugin-pdk",
"crates/yoi", "crates/yoi",
"crates/pod-store",
"crates/protocol", "crates/protocol",
"crates/provider", "crates/provider",
"crates/pod-registry",
"crates/session-metrics", "crates/session-metrics",
"crates/session-analytics", "crates/session-analytics",
"crates/lint-common", "crates/lint-common",
@ -75,8 +71,6 @@ worker = { path = "crates/worker" }
worker-runtime = { path = "crates/worker-runtime" } worker-runtime = { path = "crates/worker-runtime" }
yoi-plugin-pdk = { path = "crates/plugin-pdk" } yoi-plugin-pdk = { path = "crates/plugin-pdk" }
yoi = { path = "crates/yoi" } yoi = { path = "crates/yoi" }
pod-registry = { path = "crates/pod-registry" }
pod-store = { path = "crates/pod-store" }
protocol = { path = "crates/protocol" } protocol = { path = "crates/protocol" }
provider = { path = "crates/provider" } provider = { path = "crates/provider" }
session-metrics = { path = "crates/session-metrics" } session-metrics = { path = "crates/session-metrics" }

View File

@ -16,7 +16,7 @@ Owns:
Does not own: Does not own:
- product command names (`yoi`) - product command names (`yoi`)
- Worker state authority (`worker`, `pod-store`, `session-store`) - Worker state authority (`worker`, `session-store` worker metadata)
- UI rendering (`tui`) - UI rendering (`tui`)
- Engine turn semantics (`llm-engine`) - Engine turn semantics (`llm-engine`)

View File

@ -19,7 +19,7 @@ Does not own:
- Worker names, sockets, process lifecycle, or scope delegation (`worker`) - Worker names, sockets, process lifecycle, or scope delegation (`worker`)
- product CLI shape (`yoi`) - product CLI shape (`yoi`)
- provider catalog and secret resolution (`provider`, `secrets`) - provider catalog and secret resolution (`provider`, `secrets`)
- durable Worker current state (`pod-store`) - durable Worker current state (`session-store` worker metadata)
## Design notes ## Design notes

View File

@ -88,9 +88,9 @@ pub fn sessions_dir() -> Option<PathBuf> {
sessions_dir_from_data_dir(data_dir()) sessions_dir_from_data_dir(data_dir())
} }
/// `<runtime_dir>/workers.json` — machine-wide Worker allocation registry /// `<runtime_dir>/workers.json` — machine-wide Worker allocation table
pub fn pod_registry_path() -> Option<PathBuf> { pub fn worker_allocation_path() -> Option<PathBuf> {
pod_registry_path_from_runtime_dir(runtime_dir()) worker_allocation_path_from_runtime_dir(runtime_dir())
} }
/// `<runtime_dir>/<worker_name>/` — Worker ごとのランタイムディレクトリ。 /// `<runtime_dir>/<worker_name>/` — Worker ごとのランタイムディレクトリ。
@ -104,8 +104,8 @@ pub fn worker_runtime_dir(worker_name: &str) -> Option<PathBuf> {
/// `RuntimeDir::socket_path()` で、Worker 名が分かっている外部 (TUI の /// `RuntimeDir::socket_path()` で、Worker 名が分かっている外部 (TUI の
/// attach フロー等) からの**予測**はこの関数で行う。両者は同じパス /// attach フロー等) からの**予測**はこの関数で行う。両者は同じパス
/// を返すことが期待される。 /// を返すことが期待される。
pub fn pod_socket_path(worker_name: &str) -> Option<PathBuf> { pub fn worker_socket_path(worker_name: &str) -> Option<PathBuf> {
pod_socket_path_from_runtime_dir(runtime_dir(), worker_name) worker_socket_path_from_runtime_dir(runtime_dir(), worker_name)
} }
// ---- internals -------------------------------------------------------------- // ---- internals --------------------------------------------------------------
@ -183,7 +183,7 @@ fn sessions_dir_from_data_dir(data_dir: Option<PathBuf>) -> Option<PathBuf> {
Some(data_dir?.join("sessions")) Some(data_dir?.join("sessions"))
} }
fn pod_registry_path_from_runtime_dir(runtime_dir: Option<PathBuf>) -> Option<PathBuf> { fn worker_allocation_path_from_runtime_dir(runtime_dir: Option<PathBuf>) -> Option<PathBuf> {
Some(runtime_dir?.join("workers.json")) Some(runtime_dir?.join("workers.json"))
} }
@ -194,7 +194,7 @@ fn worker_runtime_dir_from_runtime_dir(
Some(runtime_dir?.join(worker_name)) Some(runtime_dir?.join(worker_name))
} }
fn pod_socket_path_from_runtime_dir( fn worker_socket_path_from_runtime_dir(
runtime_dir: Option<PathBuf>, runtime_dir: Option<PathBuf>,
worker_name: &str, worker_name: &str,
) -> Option<PathBuf> { ) -> Option<PathBuf> {
@ -396,7 +396,7 @@ mod tests {
PathBuf::from("/sand/sessions") PathBuf::from("/sand/sessions")
); );
assert_eq!( assert_eq!(
pod_registry_path_from_runtime_dir(runtime_dir.clone()).unwrap(), worker_allocation_path_from_runtime_dir(runtime_dir.clone()).unwrap(),
PathBuf::from("/sand/run/workers.json") PathBuf::from("/sand/run/workers.json")
); );
assert_eq!( assert_eq!(
@ -404,7 +404,7 @@ mod tests {
PathBuf::from("/sand/run/foo") PathBuf::from("/sand/run/foo")
); );
assert_eq!( assert_eq!(
pod_socket_path_from_runtime_dir(runtime_dir, "foo").unwrap(), worker_socket_path_from_runtime_dir(runtime_dir, "foo").unwrap(),
PathBuf::from("/sand/run/foo/sock") PathBuf::from("/sand/run/foo/sock")
); );
} }

View File

@ -266,7 +266,7 @@ impl Scope {
/// Allow rules with their targets resolved to absolute paths. /// Allow rules with their targets resolved to absolute paths.
/// ///
/// Used by the pod-registry, where every Worker's allocation /// Used by the worker-allocation, where every Worker's allocation
/// must be expressed in absolute terms so prefix comparisons are /// must be expressed in absolute terms so prefix comparisons are
/// meaningful across processes. /// meaningful across processes.
pub fn allow_rules(&self) -> Vec<ScopeRule> { pub fn allow_rules(&self) -> Vec<ScopeRule> {

View File

@ -1,17 +0,0 @@
[package]
name = "pod-registry"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
fs4 = { workspace = true, features = ["sync"] }
libc = { workspace = true }
manifest = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
session-store = { workspace = true }
thiserror = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }

View File

@ -1,30 +0,0 @@
# pod-registry
## Role
`pod-registry` is the legacy-named crate that tracks live Worker process ownership and delegated scope locks at runtime.
## Boundaries
Owns:
- machine-local live Worker registration
- collision detection for running Worker names
- delegated scope lock bookkeeping
- registry cleanup hooks for stopped or unreachable children
Does not own:
- durable Worker metadata (`pod-store`)
- replayable session logs (`session-store`)
- socket protocol definitions (`protocol`)
- project work item state
## Design notes
The registry is a runtime coordination mechanism. It can help decide whether a Worker is live or colliding, but durable visibility/restoration should be backed by Worker metadata when possible.
## See also
- [`../../docs/design/worker-session-state.md`](../../docs/design/worker-session-state.md)
- [`../../docs/design/tool-permissions-scope.md`](../../docs/design/tool-permissions-scope.md)

View File

@ -1,37 +0,0 @@
//! Machine-wide Worker allocation registry.
//!
//! A single JSON file at `<runtime_dir>/workers.json` records every live
//! Worker's allocation (see [`manifest::paths::pod_registry_path`] for
//! how the path is resolved). File-level `flock(2)` serialises access
//! across processes so spawn sequences from unrelated Workers can't race.
//!
//! Each Worker, when starting, acquires the lock, reclaims stale entries
//! (Workers whose PID has died), checks that its requested write scope
//! does not overlap any other allocation's effective write scope, and
//! registers itself. When it exits normally, it removes its entry and
//! returns delegated scope to its `delegated_from` parent. Crash
//! recovery rides on the next Worker that opens the file — no background
//! reaper.
mod conflict;
mod error;
mod lifecycle;
mod mutate;
mod table;
#[cfg(test)]
mod test_util;
pub use conflict::{
ConflictOwner, find_conflict_owner, find_conflict_owners, is_within_effective_write,
};
pub use error::ScopeLockError;
pub use lifecycle::{
ScopeAllocationGuard, SegmentLockInfo, adopt_allocation, install_top_level,
install_top_level_with_deny, lookup_segment, update_segment,
};
pub use mutate::{
delegate_scope, reclaim_delegated_scope, reclaim_stale, reclaim_stale_with, register_worker,
register_worker_with_deny, release_worker,
};
pub use table::{Allocation, LockFile, LockFileGuard, default_registry_path};

View File

@ -1,15 +0,0 @@
[package]
name = "pod-store"
description = "Legacy-named durable Worker metadata/state persistence"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
session-store = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }

View File

@ -1,30 +0,0 @@
# pod-store
## Role
`pod-store` is the legacy-named crate that owns current Worker metadata keyed by Worker name.
## Boundaries
Owns:
- persisted Worker metadata files
- current active/pending session pointers
- resolved manifest snapshots for restoration
- parent-visible spawned-child metadata
- restoration labels and diagnostics derived from metadata
Does not own:
- replayable conversation logs (`session-store`)
- live process locks or socket reachability (`pod-registry`, `client`)
- product CLI behavior (`yoi`)
- model turn execution (`llm-engine`)
## Design notes
Worker metadata is intentionally thin. It should answer current-state questions without duplicating transcripts or becoming a second session log.
## See also
- [`../../docs/design/worker-session-state.md`](../../docs/design/worker-session-state.md)

View File

@ -17,7 +17,7 @@ Does not own:
- Unix socket implementation details (`client`, `worker`) - Unix socket implementation details (`client`, `worker`)
- TUI rendering (`tui`) - TUI rendering (`tui`)
- Engine history semantics (`llm-engine`) - Engine history semantics (`llm-engine`)
- durable storage (`session-store`, `pod-store`) - durable storage (`session-store`, `session-store` worker metadata)
## Design notes ## Design notes

View File

@ -99,7 +99,7 @@ pub enum Method {
/// ///
/// Delivered as `Method::WorkerEvent` over the parent's Unix socket. The /// Delivered as `Method::WorkerEvent` over the parent's Unix socket. The
/// parent Controller always applies variant-specific side effects /// parent Controller always applies variant-specific side effects
/// (registry / pod-registry updates). Agent-visible variants are also /// (registry / worker-allocation updates). Agent-visible variants are also
/// queued into the notification buffer; control-plane-only variants are /// queued into the notification buffer; control-plane-only variants are
/// not injected into the parent's LLM context. /// not injected into the parent's LLM context.
/// ///

View File

@ -15,8 +15,8 @@ Owns:
Does not own: Does not own:
- current Worker-name metadata (`pod-store`) - current Worker-name metadata (`session-store` worker metadata)
- live process/socket discovery (`pod-registry`, `client`) - live process/socket discovery (`worker-allocation`, `client`)
- UI state (`tui`) - UI state (`tui`)
- generated memory summaries (`memory`) - generated memory summaries (`memory`)

View File

@ -37,6 +37,7 @@ pub mod segment;
pub mod segment_log; pub mod segment_log;
pub mod store; pub mod store;
pub mod system_item; pub mod system_item;
pub mod worker_metadata;
pub use event_trace::{TraceEntry, TracePayload}; pub use event_trace::{TraceEntry, TracePayload};
pub use fs_store::FsStore; pub use fs_store::FsStore;
@ -52,6 +53,11 @@ pub use segment::{
pub use segment_log::{LogEntry, RestoredState, SegmentOrigin, collect_state}; pub use segment_log::{LogEntry, RestoredState, SegmentOrigin, collect_state};
pub use store::{Store, StoreError}; pub use store::{Store, StoreError};
pub use system_item::{SystemItem, SystemReminder, SystemReminderSource, render_worker_event}; pub use system_item::{SystemItem, SystemReminder, SystemReminderSource, render_worker_event};
pub use worker_metadata::{
CombinedStore, FsWorkerStore, WorkerActiveSegmentRef, WorkerMetadata, WorkerMetadataStore,
WorkerPeer, WorkerReclaimedChild, WorkerSpawnedChild, WorkerSpawnedScopeRule, WorkerStoreError,
validate_worker_name,
};
/// Session identifier — the fork-tree root. UUID v7 (time-ordered). /// Session identifier — the fork-tree root. UUID v7 (time-ordered).
/// ///

View File

@ -12,8 +12,8 @@
//! model remain session JSONL history. Socket and callback paths are last-known //! model remain session JSONL history. Socket and callback paths are last-known
//! runtime hints, not proof of liveness. //! runtime hints, not proof of liveness.
use crate::{SegmentId, SessionId};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use session_store::{SegmentId, SessionId};
use std::fs; use std::fs;
use std::path::PathBuf; use std::path::PathBuf;
@ -26,8 +26,8 @@ pub enum WorkerStoreError {
#[error("serialization error: {0}")] #[error("serialization error: {0}")]
Serde(#[from] serde_json::Error), Serde(#[from] serde_json::Error),
#[error("invalid pod name: {0}")] #[error("invalid worker name: {0}")]
InvalidPodName(String), InvalidWorkerName(String),
} }
/// Active Session/Segment pointer for a Worker. /// Active Session/Segment pointer for a Worker.
@ -284,13 +284,13 @@ impl FsWorkerStore {
Ok(Self { root }) Ok(Self { root })
} }
fn pod_dir(&self, worker_name: &str) -> Result<PathBuf, WorkerStoreError> { fn worker_dir(&self, worker_name: &str) -> Result<PathBuf, WorkerStoreError> {
validate_worker_name(worker_name)?; validate_worker_name(worker_name)?;
Ok(self.root.join(worker_name)) Ok(self.root.join(worker_name))
} }
fn metadata_path(&self, worker_name: &str) -> Result<PathBuf, WorkerStoreError> { fn metadata_path(&self, worker_name: &str) -> Result<PathBuf, WorkerStoreError> {
Ok(self.pod_dir(worker_name)?.join("metadata.json")) Ok(self.worker_dir(worker_name)?.join("metadata.json"))
} }
} }
@ -364,7 +364,7 @@ pub fn validate_worker_name(worker_name: &str) -> Result<(), WorkerStoreError> {
|| worker_name.contains('/') || worker_name.contains('/')
|| worker_name.contains('\0') || worker_name.contains('\0')
{ {
return Err(WorkerStoreError::InvalidPodName(worker_name.to_string())); return Err(WorkerStoreError::InvalidWorkerName(worker_name.to_string()));
} }
Ok(()) Ok(())
} }
@ -372,61 +372,58 @@ pub fn validate_worker_name(worker_name: &str) -> Result<(), WorkerStoreError> {
/// Convenience composition for callers that want one handle carrying separate /// Convenience composition for callers that want one handle carrying separate
/// session-log and Worker-state roots. /// session-log and Worker-state roots.
#[derive(Clone)] #[derive(Clone)]
pub struct CombinedStore<S, P> { pub struct CombinedStore<S, W> {
pub session_store: S, pub session_store: S,
pub pod_store: P, pub worker_metadata_store: W,
} }
impl<S, P> CombinedStore<S, P> { impl<S, W> CombinedStore<S, W> {
pub fn new(session_store: S, pod_store: P) -> Self { pub fn new(session_store: S, worker_metadata_store: W) -> Self {
Self { Self {
session_store, session_store,
pod_store, worker_metadata_store,
} }
} }
} }
impl<S, P> session_store::Store for CombinedStore<S, P> impl<S, W> crate::Store for CombinedStore<S, W>
where where
S: session_store::Store, S: crate::Store,
P: Send + Sync, W: Send + Sync,
{ {
fn append( fn append(
&self, &self,
session_id: SessionId, session_id: SessionId,
segment_id: SegmentId, segment_id: SegmentId,
entry: &session_store::LogEntry, entry: &crate::LogEntry,
) -> Result<(), session_store::StoreError> { ) -> Result<(), crate::StoreError> {
self.session_store.append(session_id, segment_id, entry) self.session_store.append(session_id, segment_id, entry)
} }
fn read_all( fn read_all(
&self, &self,
session_id: SessionId, session_id: SessionId,
segment_id: SegmentId, segment_id: SegmentId,
) -> Result<Vec<session_store::LogEntry>, session_store::StoreError> { ) -> Result<Vec<crate::LogEntry>, crate::StoreError> {
self.session_store.read_all(session_id, segment_id) self.session_store.read_all(session_id, segment_id)
} }
fn list_sessions(&self) -> Result<Vec<SessionId>, session_store::StoreError> { fn list_sessions(&self) -> Result<Vec<SessionId>, crate::StoreError> {
self.session_store.list_sessions() self.session_store.list_sessions()
} }
fn list_segments( fn list_segments(&self, session_id: SessionId) -> Result<Vec<SegmentId>, crate::StoreError> {
&self,
session_id: SessionId,
) -> Result<Vec<SegmentId>, session_store::StoreError> {
self.session_store.list_segments(session_id) self.session_store.list_segments(session_id)
} }
fn lookup_session_of( fn lookup_session_of(
&self, &self,
segment_id: SegmentId, segment_id: SegmentId,
) -> Result<Option<SessionId>, session_store::StoreError> { ) -> Result<Option<SessionId>, crate::StoreError> {
self.session_store.lookup_session_of(segment_id) self.session_store.lookup_session_of(segment_id)
} }
fn create_segment( fn create_segment(
&self, &self,
session_id: SessionId, session_id: SessionId,
segment_id: SegmentId, segment_id: SegmentId,
entries: &[session_store::LogEntry], entries: &[crate::LogEntry],
) -> Result<(), session_store::StoreError> { ) -> Result<(), crate::StoreError> {
self.session_store self.session_store
.create_segment(session_id, segment_id, entries) .create_segment(session_id, segment_id, entries)
} }
@ -434,7 +431,7 @@ where
&self, &self,
session_id: SessionId, session_id: SessionId,
segment_id: SegmentId, segment_id: SegmentId,
) -> Result<bool, session_store::StoreError> { ) -> Result<bool, crate::StoreError> {
self.session_store.exists(session_id, segment_id) self.session_store.exists(session_id, segment_id)
} }
fn truncate( fn truncate(
@ -442,7 +439,7 @@ where
session_id: SessionId, session_id: SessionId,
segment_id: SegmentId, segment_id: SegmentId,
entries_len: usize, entries_len: usize,
) -> Result<(), session_store::StoreError> { ) -> Result<(), crate::StoreError> {
self.session_store self.session_store
.truncate(session_id, segment_id, entries_len) .truncate(session_id, segment_id, entries_len)
} }
@ -450,39 +447,39 @@ where
&self, &self,
session_id: SessionId, session_id: SessionId,
segment_id: SegmentId, segment_id: SegmentId,
) -> Result<usize, session_store::StoreError> { ) -> Result<usize, crate::StoreError> {
self.session_store.read_entry_count(session_id, segment_id) self.session_store.read_entry_count(session_id, segment_id)
} }
fn append_trace( fn append_trace(
&self, &self,
session_id: SessionId, session_id: SessionId,
segment_id: SegmentId, segment_id: SegmentId,
entry: &session_store::TraceEntry, entry: &crate::TraceEntry,
) -> Result<(), session_store::StoreError> { ) -> Result<(), crate::StoreError> {
self.session_store self.session_store
.append_trace(session_id, segment_id, entry) .append_trace(session_id, segment_id, entry)
} }
} }
impl<S, P> WorkerMetadataStore for CombinedStore<S, P> impl<S, W> WorkerMetadataStore for CombinedStore<S, W>
where where
S: Send + Sync, S: Send + Sync,
P: WorkerMetadataStore, W: WorkerMetadataStore,
{ {
fn write(&self, metadata: &WorkerMetadata) -> Result<(), WorkerStoreError> { fn write(&self, metadata: &WorkerMetadata) -> Result<(), WorkerStoreError> {
self.pod_store.write(metadata) self.worker_metadata_store.write(metadata)
} }
fn read_by_name(&self, worker_name: &str) -> Result<Option<WorkerMetadata>, WorkerStoreError> { fn read_by_name(&self, worker_name: &str) -> Result<Option<WorkerMetadata>, WorkerStoreError> {
self.pod_store.read_by_name(worker_name) self.worker_metadata_store.read_by_name(worker_name)
} }
fn list_names(&self) -> Result<Vec<String>, WorkerStoreError> { fn list_names(&self) -> Result<Vec<String>, WorkerStoreError> {
self.pod_store.list_names() self.worker_metadata_store.list_names()
} }
fn root_dir(&self) -> Option<PathBuf> { fn root_dir(&self) -> Option<PathBuf> {
self.pod_store.root_dir() self.worker_metadata_store.root_dir()
} }
fn delete_by_name(&self, worker_name: &str) -> Result<(), WorkerStoreError> { fn delete_by_name(&self, worker_name: &str) -> Result<(), WorkerStoreError> {
self.pod_store.delete_by_name(worker_name) self.worker_metadata_store.delete_by_name(worker_name)
} }
} }
@ -491,15 +488,15 @@ mod tests {
use super::*; use super::*;
#[test] #[test]
fn pod_metadata_manifest_snapshot_roundtrips() { fn worker_metadata_manifest_snapshot_roundtrips() {
let mut metadata = WorkerMetadata::new( let mut metadata = WorkerMetadata::new(
"profile-pod", "profile-worker",
Some(WorkerActiveSegmentRef::pending_segment( Some(WorkerActiveSegmentRef::pending_segment(
session_store::new_session_id(), crate::new_session_id(),
)), )),
); );
metadata.resolved_manifest_snapshot = Some(serde_json::json!({ metadata.resolved_manifest_snapshot = Some(serde_json::json!({
"pod": { "name": "profile-pod" }, "worker": { "name": "profile-worker" },
"profile": { "source": { "kind": "path", "path": "/profiles/coder.lua" } } "profile": { "source": { "kind": "path", "path": "/profiles/coder.lua" } }
})); }));
@ -510,22 +507,22 @@ mod tests {
} }
#[test] #[test]
fn fs_store_writes_under_pod_state_root_only() { fn fs_store_writes_under_worker_state_root_only() {
let tmp = tempfile::TempDir::new().unwrap(); let tmp = tempfile::TempDir::new().unwrap();
let session_root = tmp.path().join("sessions"); let session_root = tmp.path().join("sessions");
let pod_root = tmp.path().join("workers"); let worker_root = tmp.path().join("workers");
fs::create_dir_all(&session_root).unwrap(); fs::create_dir_all(&session_root).unwrap();
let store = FsWorkerStore::new(&pod_root).unwrap(); let store = FsWorkerStore::new(&worker_root).unwrap();
store store
.write(&WorkerMetadata::new( .write(&WorkerMetadata::new(
"agent", "agent",
Some(WorkerActiveSegmentRef::pending_segment( Some(WorkerActiveSegmentRef::pending_segment(
session_store::new_session_id(), crate::new_session_id(),
)), )),
)) ))
.unwrap(); .unwrap();
assert!(pod_root.join("agent/metadata.json").exists()); assert!(worker_root.join("agent/metadata.json").exists());
assert!(!session_root.join("workers/agent/metadata.json").exists()); assert!(!session_root.join("workers/agent/metadata.json").exists());
} }
@ -540,16 +537,16 @@ mod tests {
scope_delegated: vec![], scope_delegated: vec![],
callback_address: std::path::Path::new("/tmp/parent.sock").into(), callback_address: std::path::Path::new("/tmp/parent.sock").into(),
}); });
metadata.resolved_manifest_snapshot = Some(serde_json::json!({"pod":{"name":"agent"}})); metadata.resolved_manifest_snapshot = Some(serde_json::json!({"worker":{"name":"agent"}}));
store.write(&metadata).unwrap(); store.write(&metadata).unwrap();
let snapshot = serde_json::json!({"pod":{"name":"updated"}}); let snapshot = serde_json::json!({"worker":{"name":"updated"}});
store store
.set_active( .set_active(
"agent", "agent",
Some(WorkerActiveSegmentRef::active_segment( Some(WorkerActiveSegmentRef::active_segment(
session_store::new_session_id(), crate::new_session_id(),
session_store::new_segment_id(), crate::new_segment_id(),
)), )),
Some(snapshot.clone()), Some(snapshot.clone()),
) )
@ -564,10 +561,10 @@ mod tests {
let tmp = tempfile::TempDir::new().unwrap(); let tmp = tempfile::TempDir::new().unwrap();
let store = FsWorkerStore::new(tmp.path()).unwrap(); let store = FsWorkerStore::new(tmp.path()).unwrap();
let active = WorkerActiveSegmentRef::active_segment( let active = WorkerActiveSegmentRef::active_segment(
session_store::new_session_id(), crate::new_session_id(),
session_store::new_segment_id(), crate::new_segment_id(),
); );
let snapshot = serde_json::json!({"pod":{"name":"agent"}}); let snapshot = serde_json::json!({"worker":{"name":"agent"}});
store store
.set_active("agent", Some(active.clone()), Some(snapshot.clone())) .set_active("agent", Some(active.clone()), Some(snapshot.clone()))
.unwrap(); .unwrap();
@ -592,10 +589,10 @@ mod tests {
let tmp = tempfile::TempDir::new().unwrap(); let tmp = tempfile::TempDir::new().unwrap();
let store = FsWorkerStore::new(tmp.path()).unwrap(); let store = FsWorkerStore::new(tmp.path()).unwrap();
let active = WorkerActiveSegmentRef::active_segment( let active = WorkerActiveSegmentRef::active_segment(
session_store::new_session_id(), crate::new_session_id(),
session_store::new_segment_id(), crate::new_segment_id(),
); );
let snapshot = serde_json::json!({"pod":{"name":"agent"}}); let snapshot = serde_json::json!({"worker":{"name":"agent"}});
store store
.set_active("agent", Some(active.clone()), Some(snapshot.clone())) .set_active("agent", Some(active.clone()), Some(snapshot.clone()))
.unwrap(); .unwrap();

View File

@ -22,8 +22,7 @@ toml = { workspace = true }
manifest = { workspace = true } manifest = { workspace = true }
secrets = { workspace = true } secrets = { workspace = true }
session-store = { workspace = true } session-store = { workspace = true }
pod-store = { workspace = true } fs4 = { workspace = true }
pod-registry = { workspace = true }
provider = { workspace = true } provider = { workspace = true }
ticket = { workspace = true } ticket = { workspace = true }
serde = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] }

View File

@ -16,7 +16,7 @@ Owns:
Does not own: Does not own:
- durable transcript authority (`session-store`) - durable transcript authority (`session-store`)
- Worker current state (`pod-store`) - Worker current state (`session-store` worker metadata)
- Worker lifecycle policy (`worker`) - Worker lifecycle policy (`worker`)
- product CLI ownership (`yoi`) - product CLI ownership (`yoi`)

View File

@ -132,7 +132,7 @@ fn resolve_socket(worker_name: &str, override_path: Option<PathBuf>) -> PathBuf
if let Some(p) = override_path { if let Some(p) = override_path {
return p; return p;
} }
manifest::paths::pod_socket_path(worker_name).unwrap_or_else(|| { manifest::paths::worker_socket_path(worker_name).unwrap_or_else(|| {
PathBuf::from("/tmp") PathBuf::from("/tmp")
.join("yoi") .join("yoi")
.join(worker_name) .join(worker_name)
@ -317,7 +317,7 @@ async fn connect_live_pod(
if !allow_registry_fallback { if !allow_registry_fallback {
return None; return None;
} }
let registry_socket = picker::live_socket_for_pod(worker_name)?; let registry_socket = picker::live_socket_for_worker(worker_name)?;
if registry_socket == preferred_socket { if registry_socket == preferred_socket {
return None; return None;
} }

View File

@ -19,7 +19,6 @@ use crossterm::event::{
Event as TermEvent, KeyCode, KeyEvent, KeyModifiers, MouseButton, MouseEvent, MouseEventKind, Event as TermEvent, KeyCode, KeyEvent, KeyModifiers, MouseButton, MouseEvent, MouseEventKind,
poll, read, poll, read,
}; };
use pod_store::FsWorkerStore;
use protocol::stream::{JsonLineReader, JsonLineWriter}; use protocol::stream::{JsonLineReader, JsonLineWriter};
use protocol::{ErrorCode, Event, Method, Segment, WorkerStatus}; use protocol::{ErrorCode, Event, Method, Segment, WorkerStatus};
use ratatui::Frame; use ratatui::Frame;
@ -31,6 +30,7 @@ use ratatui::text::{Line, Span};
use ratatui::widgets::{Block, Borders, Clear, Paragraph, Widget, Wrap}; use ratatui::widgets::{Block, Borders, Clear, Paragraph, Widget, Wrap};
use serde::Serialize; use serde::Serialize;
use session_store::FsStore; use session_store::FsStore;
use session_store::FsWorkerStore;
use ticket::config::{GitBranchName, TicketConfig, TicketOrchestrationConfig}; use ticket::config::{GitBranchName, TicketConfig, TicketOrchestrationConfig};
use ticket::{ use ticket::{
LocalTicketBackend, MarkdownText, TicketBackend, TicketIdOrSlug, TicketStateChange, LocalTicketBackend, MarkdownText, TicketBackend, TicketIdOrSlug, TicketStateChange,
@ -46,7 +46,7 @@ use crate::role_session_registry::{
}; };
use crate::worker_list::{ use crate::worker_list::{
StoredMetadataState, WorkerList, WorkerListEntry, WorkerVisibilitySource, StoredMetadataState, WorkerList, WorkerListEntry, WorkerVisibilitySource,
read_reachable_live_pod_infos, read_stored_worker_infos, read_reachable_live_worker_infos, read_stored_worker_infos,
}; };
#[cfg(not(feature = "e2e-test"))] #[cfg(not(feature = "e2e-test"))]
use crate::workspace_panel::build_workspace_panel; use crate::workspace_panel::build_workspace_panel;
@ -522,9 +522,9 @@ fn default_store_dir() -> Result<PathBuf, DashboardError> {
}) })
} }
fn default_pod_store_dir() -> Result<PathBuf, DashboardError> { fn default_worker_metadata_dir() -> Result<PathBuf, DashboardError> {
manifest::paths::data_dir() manifest::paths::data_dir()
.map(|dir| dir.join("pods")) .map(|dir| dir.join("workers"))
.ok_or_else(|| { .ok_or_else(|| {
DashboardError::Io(io::Error::new( DashboardError::Io(io::Error::new(
io::ErrorKind::NotFound, io::ErrorKind::NotFound,
@ -3423,9 +3423,10 @@ async fn load_worker_list(
) -> Result<WorkerList, DashboardError> { ) -> Result<WorkerList, DashboardError> {
let store_dir = default_store_dir()?; let store_dir = default_store_dir()?;
let store = FsStore::new(&store_dir)?; let store = FsStore::new(&store_dir)?;
let pod_store = FsWorkerStore::new(default_pod_store_dir()?).map_err(io::Error::other)?; let worker_metadata_store =
let stored = read_stored_worker_infos(&store, &pod_store)?; FsWorkerStore::new(default_worker_metadata_dir()?).map_err(io::Error::other)?;
let live = read_reachable_live_pod_infos(&store) let stored = read_stored_worker_infos(&store, &worker_metadata_store)?;
let live = read_reachable_live_worker_infos(&store)
.await .await
.unwrap_or_default(); .unwrap_or_default();
Ok(WorkerList::from_workspace_sources( Ok(WorkerList::from_workspace_sources(

View File

@ -1,7 +1,7 @@
//! Inline-viewport "pick a Worker to attach or restore" UX. //! Inline-viewport "pick a Worker to attach or restore" UX.
//! //!
//! Reads live Worker allocations from the runtime registry and stopped Worker state //! Reads live Worker allocations from the runtime registry and stopped Worker state
//! from the pod-store name-keyed metadata. Picking a live row attaches to //! from the session-store worker metadata name-keyed metadata. Picking a live row attaches to
//! its socket; picking a stopped row restores via the Worker runtime command. //! its socket; picking a stopped row restores via the Worker runtime command.
use std::io; use std::io;
@ -9,7 +9,6 @@ use std::path::PathBuf;
use std::time::Duration; use std::time::Duration;
use crossterm::event::{self, Event as TermEvent, KeyCode, KeyEventKind, KeyModifiers}; use crossterm::event::{self, Event as TermEvent, KeyCode, KeyEventKind, KeyModifiers};
use pod_store::FsWorkerStore;
use ratatui::Terminal; use ratatui::Terminal;
use ratatui::backend::CrosstermBackend; use ratatui::backend::CrosstermBackend;
use ratatui::layout::{Constraint, Layout}; use ratatui::layout::{Constraint, Layout};
@ -18,11 +17,12 @@ use ratatui::text::{Line, Span};
use ratatui::widgets::Paragraph; use ratatui::widgets::Paragraph;
use ratatui::{Frame, TerminalOptions, Viewport}; use ratatui::{Frame, TerminalOptions, Viewport};
use session_store::FsStore; use session_store::FsStore;
use session_store::FsWorkerStore;
use crate::worker_list::{ use crate::worker_list::{
LiveWorkerInfo, StoredMetadataState, StoredWorkerInfo, WorkerList, WorkerListEntry, LiveWorkerInfo, StoredMetadataState, StoredWorkerInfo, WorkerList, WorkerListEntry,
WorkerVisibilitySource, live_socket_for_pod as worker_list_live_socket_for_pod, WorkerVisibilitySource, live_socket_for_worker as worker_list_live_socket_for_worker,
read_reachable_live_pod_infos, read_stored_worker_infos, read_reachable_live_worker_infos, read_stored_worker_infos,
}; };
const MAX_ROWS: usize = 10; const MAX_ROWS: usize = 10;
@ -156,9 +156,10 @@ fn list_for_options(
pub async fn run(options: PickerOptions) -> Result<PickerOutcome, PickerError> { pub async fn run(options: PickerOptions) -> Result<PickerOutcome, PickerError> {
let store_dir = default_store_dir()?; let store_dir = default_store_dir()?;
let store = FsStore::new(&store_dir)?; let store = FsStore::new(&store_dir)?;
let pod_store = FsWorkerStore::new(default_pod_store_dir()?).map_err(io::Error::other)?; let worker_metadata_store =
let stored_workers = read_stored_worker_infos(&store, &pod_store)?; FsWorkerStore::new(default_worker_metadata_dir()?).map_err(io::Error::other)?;
let live_workers = read_reachable_live_pod_infos(&store) let stored_workers = read_stored_worker_infos(&store, &worker_metadata_store)?;
let live_workers = read_reachable_live_worker_infos(&store)
.await .await
.unwrap_or_default(); .unwrap_or_default();
let mut list = list_for_options(&options, stored_workers, live_workers); let mut list = list_for_options(&options, stored_workers, live_workers);
@ -223,9 +224,9 @@ fn default_store_dir() -> Result<PathBuf, PickerError> {
}) })
} }
fn default_pod_store_dir() -> Result<PathBuf, PickerError> { fn default_worker_metadata_dir() -> Result<PathBuf, PickerError> {
manifest::paths::data_dir() manifest::paths::data_dir()
.map(|dir| dir.join("pods")) .map(|dir| dir.join("workers"))
.ok_or_else(|| { .ok_or_else(|| {
PickerError::Io(io::Error::new( PickerError::Io(io::Error::new(
io::ErrorKind::NotFound, io::ErrorKind::NotFound,
@ -235,8 +236,8 @@ fn default_pod_store_dir() -> Result<PathBuf, PickerError> {
}) })
} }
pub(crate) fn live_socket_for_pod(worker_name: &str) -> Option<PathBuf> { pub(crate) fn live_socket_for_worker(worker_name: &str) -> Option<PathBuf> {
worker_list_live_socket_for_pod(worker_name) worker_list_live_socket_for_worker(worker_name)
} }
fn make_inline_terminal() -> io::Result<Terminal<CrosstermBackend<io::Stdout>>> { fn make_inline_terminal() -> io::Result<Terminal<CrosstermBackend<io::Stdout>>> {

View File

@ -1,13 +1,15 @@
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::io; use std::fs::File;
use std::io::{self, Read};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::time::Duration; use std::time::Duration;
use client::WorkerClient; use client::WorkerClient;
use pod_registry::{LockFileGuard, default_registry_path}; use manifest::paths;
use pod_store::{WorkerActiveSegmentRef, WorkerMetadata, WorkerMetadataStore};
use protocol::{Event, WorkerStatus}; use protocol::{Event, WorkerStatus};
use serde::Deserialize;
use session_store::{FsStore, SegmentId, SessionId}; use session_store::{FsStore, SegmentId, SessionId};
use session_store::{WorkerActiveSegmentRef, WorkerMetadata, WorkerMetadataStore};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct WorkerList { pub(crate) struct WorkerList {
@ -314,11 +316,14 @@ pub(crate) enum WorkerEntryDiagnosticKind {
pub(crate) fn read_stored_worker_infos( pub(crate) fn read_stored_worker_infos(
store: &FsStore, store: &FsStore,
pod_store: &impl WorkerMetadataStore, worker_metadata_store: &impl WorkerMetadataStore,
) -> Result<Vec<StoredWorkerInfo>, io::Error> { ) -> Result<Vec<StoredWorkerInfo>, io::Error> {
let mut records = Vec::new(); let mut records = Vec::new();
for worker_name in pod_store.list_names().map_err(io::Error::other)? { for worker_name in worker_metadata_store
let info = match pod_store.read_by_name(&worker_name) { .list_names()
.map_err(io::Error::other)?
{
let info = match worker_metadata_store.read_by_name(&worker_name) {
Ok(Some(metadata)) => stored_info_from_metadata(store, worker_name, metadata), Ok(Some(metadata)) => stored_info_from_metadata(store, worker_name, metadata),
Ok(None) => corrupt_stored_info( Ok(None) => corrupt_stored_info(
worker_name, worker_name,
@ -331,16 +336,24 @@ pub(crate) fn read_stored_worker_infos(
Ok(records) Ok(records)
} }
pub(crate) fn read_live_pod_infos() -> Result<Vec<LiveWorkerInfo>, io::Error> { pub(crate) fn read_live_worker_infos() -> Result<Vec<LiveWorkerInfo>, io::Error> {
let path = default_registry_path()?; let path = paths::worker_allocation_path().ok_or_else(|| {
let guard = LockFileGuard::open(&path)?; io::Error::new(
Ok(guard io::ErrorKind::NotFound,
.data() "could not resolve worker allocation path",
)
})?;
let table = match read_worker_allocation_table(&path) {
Ok(table) => table,
Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(err) => return Err(err),
};
Ok(table
.allocations .allocations
.iter() .into_iter()
.map(|allocation| LiveWorkerInfo { .map(|allocation| LiveWorkerInfo {
worker_name: allocation.worker_name.clone(), worker_name: allocation.worker_name,
socket_path: allocation.socket.clone(), socket_path: allocation.socket,
status: None, status: None,
reachable: false, reachable: false,
segment_id: allocation.segment_id, segment_id: allocation.segment_id,
@ -349,20 +362,49 @@ pub(crate) fn read_live_pod_infos() -> Result<Vec<LiveWorkerInfo>, io::Error> {
.collect()) .collect())
} }
pub(crate) async fn read_reachable_live_pod_infos( fn read_worker_allocation_table(path: &Path) -> Result<WorkerAllocationTable, io::Error> {
store: &FsStore, let mut file = File::open(path)?;
) -> Result<Vec<LiveWorkerInfo>, io::Error> { fs4::fs_std::FileExt::lock_shared(&file)?;
let records = read_live_pod_infos()?; let mut contents = String::new();
probe_reachable_live_pod_infos(store, records).await let read_result = file.read_to_string(&mut contents);
let unlock_result = fs4::fs_std::FileExt::unlock(&file);
read_result?;
unlock_result?;
if contents.trim().is_empty() {
return Ok(WorkerAllocationTable::default());
}
serde_json::from_str(&contents).map_err(io::Error::other)
} }
async fn probe_reachable_live_pod_infos( #[derive(Debug, Default, Deserialize)]
struct WorkerAllocationTable {
#[serde(default)]
allocations: Vec<WorkerAllocationRecord>,
}
#[derive(Debug, Deserialize)]
struct WorkerAllocationRecord {
worker_name: String,
socket: PathBuf,
#[serde(default)]
segment_id: Option<SegmentId>,
}
pub(crate) async fn read_reachable_live_worker_infos(
store: &FsStore,
) -> Result<Vec<LiveWorkerInfo>, io::Error> {
let records = read_live_worker_infos()?;
probe_reachable_live_worker_infos(store, records).await
}
async fn probe_reachable_live_worker_infos(
_store: &FsStore, _store: &FsStore,
records: Vec<LiveWorkerInfo>, records: Vec<LiveWorkerInfo>,
) -> Result<Vec<LiveWorkerInfo>, io::Error> { ) -> Result<Vec<LiveWorkerInfo>, io::Error> {
let mut handles = Vec::with_capacity(records.len()); let mut handles = Vec::with_capacity(records.len());
for record in records { for record in records {
handles.push(tokio::spawn(probe_live_pod_info(record))); handles.push(tokio::spawn(probe_live_worker_info(record)));
} }
let mut reachable = Vec::with_capacity(handles.len()); let mut reachable = Vec::with_capacity(handles.len());
@ -378,15 +420,15 @@ async fn probe_reachable_live_pod_infos(
Ok(reachable) Ok(reachable)
} }
async fn probe_live_pod_info(mut record: LiveWorkerInfo) -> Result<LiveWorkerInfo, io::Error> { async fn probe_live_worker_info(mut record: LiveWorkerInfo) -> Result<LiveWorkerInfo, io::Error> {
let status = probe_live_status(&record.socket_path).await?; let status = probe_live_status(&record.socket_path).await?;
record.reachable = true; record.reachable = true;
record.status = status; record.status = status;
Ok(record) Ok(record)
} }
pub(crate) fn live_socket_for_pod(worker_name: &str) -> Option<PathBuf> { pub(crate) fn live_socket_for_worker(worker_name: &str) -> Option<PathBuf> {
read_live_pod_infos() read_live_worker_infos()
.ok()? .ok()?
.into_iter() .into_iter()
.find(|worker| worker.worker_name == worker_name) .find(|worker| worker.worker_name == worker_name)
@ -560,10 +602,10 @@ mod tests {
use std::sync::Arc; use std::sync::Arc;
use llm_engine::llm_client::types::RequestConfig; use llm_engine::llm_client::types::RequestConfig;
use pod_store::FsWorkerStore;
use pod_store::{WorkerActiveSegmentRef, WorkerMetadataStore};
use protocol::stream::JsonLineWriter; use protocol::stream::JsonLineWriter;
use session_store::FsWorkerStore;
use session_store::{LogEntry, Store, new_segment_id, new_session_id}; use session_store::{LogEntry, Store, new_segment_id, new_session_id};
use session_store::{WorkerActiveSegmentRef, WorkerMetadataStore};
use tempfile::tempdir; use tempfile::tempdir;
use tokio::net::UnixListener; use tokio::net::UnixListener;
use tokio::sync::Barrier; use tokio::sync::Barrier;
@ -877,7 +919,7 @@ mod tests {
let records = tokio::time::timeout( let records = tokio::time::timeout(
LIVE_STATUS_PROBE_TIMEOUT * 3, LIVE_STATUS_PROBE_TIMEOUT * 3,
probe_reachable_live_pod_infos(&store, records), probe_reachable_live_worker_infos(&store, records),
) )
.await .await
.expect("status probes should complete") .expect("status probes should complete")
@ -907,7 +949,7 @@ mod tests {
std::future::pending::<()>().await; std::future::pending::<()>().await;
}); });
let records = probe_reachable_live_pod_infos( let records = probe_reachable_live_worker_infos(
&store, &store,
vec![live_probe_record("silent", socket_path.clone())], vec![live_probe_record("silent", socket_path.clone())],
) )
@ -985,12 +1027,12 @@ mod tests {
fn read_stored_worker_infos_reports_corrupt_metadata() { fn read_stored_worker_infos_reports_corrupt_metadata() {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let store = FsStore::new(dir.path()).unwrap(); let store = FsStore::new(dir.path()).unwrap();
let pod_store = FsWorkerStore::new(dir.path().join("pods")).unwrap(); let worker_metadata_store = FsWorkerStore::new(dir.path().join("workers")).unwrap();
let pod_dir = dir.path().join("pods").join("broken"); let worker_metadata_dir = dir.path().join("workers").join("broken");
std::fs::create_dir_all(&pod_dir).unwrap(); std::fs::create_dir_all(&worker_metadata_dir).unwrap();
std::fs::write(pod_dir.join("metadata.json"), "{not-json").unwrap(); std::fs::write(worker_metadata_dir.join("metadata.json"), "{not-json").unwrap();
let records = read_stored_worker_infos(&store, &pod_store).unwrap(); let records = read_stored_worker_infos(&store, &worker_metadata_store).unwrap();
assert_eq!(records.len(), 1); assert_eq!(records.len(), 1);
assert_eq!(records[0].worker_name, "broken"); assert_eq!(records[0].worker_name, "broken");
assert!(matches!( assert!(matches!(
@ -1003,10 +1045,10 @@ mod tests {
fn read_stored_worker_infos_reads_metadata() { fn read_stored_worker_infos_reads_metadata() {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let store = FsStore::new(dir.path()).unwrap(); let store = FsStore::new(dir.path()).unwrap();
let pod_store = FsWorkerStore::new(dir.path().join("pods")).unwrap(); let worker_metadata_store = FsWorkerStore::new(dir.path().join("workers")).unwrap();
let session_id = new_session_id(); let session_id = new_session_id();
let segment_id = new_segment_id(); let segment_id = new_segment_id();
pod_store worker_metadata_store
.write(&WorkerMetadata::new( .write(&WorkerMetadata::new(
"agent", "agent",
Some(WorkerActiveSegmentRef::active_segment( Some(WorkerActiveSegmentRef::active_segment(
@ -1015,7 +1057,7 @@ mod tests {
)) ))
.unwrap(); .unwrap();
let records = read_stored_worker_infos(&store, &pod_store).unwrap(); let records = read_stored_worker_infos(&store, &worker_metadata_store).unwrap();
assert_eq!(records.len(), 1); assert_eq!(records.len(), 1);
assert_eq!(records[0].worker_name, "agent"); assert_eq!(records[0].worker_name, "agent");
assert_eq!(records[0].metadata_state, StoredMetadataState::Present); assert_eq!(records[0].metadata_state, StoredMetadataState::Present);

View File

@ -14,13 +14,11 @@ async-trait = { workspace = true }
clap = { version = "4.6.0", features = ["derive"] } clap = { version = "4.6.0", features = ["derive"] }
llm-engine = { workspace = true } llm-engine = { workspace = true }
session-store = { workspace = true } session-store = { workspace = true }
pod-store = { workspace = true }
manifest = { workspace = true } manifest = { workspace = true }
mcp = { workspace = true } mcp = { workspace = true }
protocol = { workspace = true } protocol = { workspace = true }
provider = { workspace = true } provider = { workspace = true }
client = { workspace = true } client = { workspace = true }
pod-registry = { workspace = true }
worker-runtime = { workspace = true, features = ["ws-server"], optional = true } worker-runtime = { workspace = true, features = ["ws-server"], optional = true }
serde = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true } serde_json = { workspace = true }

View File

@ -10,7 +10,7 @@ Owns:
- Worker lifecycle and socket protocol serving - Worker lifecycle and socket protocol serving
- Engine construction around a resolved Manifest - Engine construction around a resolved Manifest
- session-store and pod-store coordination - session-store and session-store worker metadata coordination
- built-in tool registration under scope/policy - built-in tool registration under scope/policy
- spawned-child orchestration hooks - spawned-child orchestration hooks
@ -19,7 +19,7 @@ Does not own:
- provider-specific wire formats (`provider` / `llm-engine` clients) - provider-specific wire formats (`provider` / `llm-engine` clients)
- product CLI parsing (`yoi`) - product CLI parsing (`yoi`)
- TUI display authority (`tui`) - TUI display authority (`tui`)
- current-state storage schema outside Worker metadata (`pod-store`) - current-state storage schema outside Worker metadata (`session-store` worker metadata)
## Design notes ## Design notes

View File

@ -11,8 +11,8 @@
//! cargo run -p worker --example worker_cli //! cargo run -p worker --example worker_cli
//! ``` //! ```
use pod_store::{CombinedStore, FsWorkerStore};
use session_store::FsStore; use session_store::FsStore;
use session_store::{CombinedStore, FsWorkerStore};
use worker::{Worker, WorkerManifest, WorkerRunResult}; use worker::{Worker, WorkerManifest, WorkerRunResult};
fn manifest_toml(pwd: &std::path::Path) -> String { fn manifest_toml(pwd: &std::path::Path) -> String {

View File

@ -5,8 +5,8 @@
//! cargo run -p worker --example worker_protocol //! cargo run -p worker --example worker_protocol
//! ``` //! ```
use pod_store::{CombinedStore, FsWorkerStore};
use session_store::FsStore; use session_store::FsStore;
use session_store::{CombinedStore, FsWorkerStore};
use worker::{Event, Method, WorkerController}; use worker::{Event, Method, WorkerController};
fn manifest_toml(pwd: &std::path::Path) -> String { fn manifest_toml(pwd: &std::path::Path) -> String {

View File

@ -5,8 +5,8 @@ use std::sync::atomic::Ordering;
use llm_engine::EngineError; use llm_engine::EngineError;
use llm_engine::llm_client::client::LlmClient; use llm_engine::llm_client::client::LlmClient;
use manifest::TicketFeatureAccessConfig; use manifest::TicketFeatureAccessConfig;
use pod_store::WorkerMetadataStore;
use session_store::Store; use session_store::Store;
use session_store::WorkerMetadataStore;
use ticket::LocalTicketBackend; use ticket::LocalTicketBackend;
use ticket::config::TicketConfig; use ticket::config::TicketConfig;
use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::sync::{broadcast, mpsc, oneshot};
@ -608,7 +608,7 @@ where
let spawner_name = worker.manifest().worker.name.clone(); let spawner_name = worker.manifest().worker.name.clone();
let spawner_manifest = worker.manifest().clone(); let spawner_manifest = worker.manifest().clone();
let prompts = worker.prompts().clone(); let prompts = worker.prompts().clone();
let pod_store = worker.store().clone(); let worker_metadata_store = worker.store().clone();
let self_parent_socket = worker.callback_socket().cloned(); let self_parent_socket = worker.callback_socket().cloned();
// The Worker's SharedScope (already augmented with the bash-output // The Worker's SharedScope (already augmented with the bash-output
@ -724,8 +724,13 @@ where
worker.register_tool(send_to_worker_tool(spawned_registry.clone())); worker.register_tool(send_to_worker_tool(spawned_registry.clone()));
worker.register_tool(read_worker_output_tool(spawned_registry.clone())); worker.register_tool(read_worker_output_tool(spawned_registry.clone()));
worker.register_tool(stop_worker_tool(spawned_registry.clone())); worker.register_tool(stop_worker_tool(spawned_registry.clone()));
let discovery = let discovery = WorkerDiscovery::new(
WorkerDiscovery::new(pod_store, spawner_name, runtime_base, cwd, spawned_registry); worker_metadata_store,
spawner_name,
runtime_base,
cwd,
spawned_registry,
);
worker.register_tool(list_workers_tool(discovery.clone())); worker.register_tool(list_workers_tool(discovery.clone()));
worker.register_tool(restore_worker_tool(discovery.clone())); worker.register_tool(restore_worker_tool(discovery.clone()));
worker.register_tool(send_to_peer_worker_tool(discovery)); worker.register_tool(send_to_peer_worker_tool(discovery));

View File

@ -18,19 +18,19 @@ use async_trait::async_trait;
use client::WorkerRuntimeCommand; use client::WorkerRuntimeCommand;
use llm_engine::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use llm_engine::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use manifest::{Permission, ScopeRule}; use manifest::{Permission, ScopeRule};
use pod_store::{
WorkerActiveSegmentRef, WorkerMetadata, WorkerMetadataStore, validate_worker_name,
};
use protocol::stream::JsonLineReader; use protocol::stream::JsonLineReader;
use protocol::{Event, Method, WorkerStatus}; use protocol::{Event, Method, WorkerStatus};
use schemars::JsonSchema; use schemars::JsonSchema;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use session_store::{SegmentId, SessionId}; use session_store::{SegmentId, SessionId};
use session_store::{
WorkerActiveSegmentRef, WorkerMetadata, WorkerMetadataStore, validate_worker_name,
};
use tokio::net::UnixStream; use tokio::net::UnixStream;
use tokio::process::Command; use tokio::process::Command;
use crate::runtime::dir::SpawnedWorkerRecord; use crate::runtime::dir::SpawnedWorkerRecord;
use crate::runtime::pod_registry; use crate::runtime::worker_allocation;
use crate::spawn::comm_tools::connect_and_send; use crate::spawn::comm_tools::connect_and_send;
use crate::spawn::registry::SpawnedWorkerRegistry; use crate::spawn::registry::SpawnedWorkerRegistry;
@ -705,9 +705,9 @@ pub enum WorkerDiscoveryError {
#[error("session store error: {0}")] #[error("session store error: {0}")]
Store(#[from] session_store::StoreError), Store(#[from] session_store::StoreError),
#[error("worker store error: {0}")] #[error("worker store error: {0}")]
WorkerStore(#[from] pod_store::WorkerStoreError), WorkerStore(#[from] session_store::WorkerStoreError),
#[error("scope lock error: {0}")] #[error("scope lock error: {0}")]
ScopeLock(#[from] pod_registry::ScopeLockError), ScopeLock(#[from] worker_allocation::ScopeLockError),
#[error("failed to launch restore process: {0}")] #[error("failed to launch restore process: {0}")]
RestoreSpawn(io::Error), RestoreSpawn(io::Error),
#[error("failed to launch restore runtime command `{command}`: {source}")] #[error("failed to launch restore runtime command `{command}`: {source}")]
@ -748,7 +748,7 @@ impl VisibilitySet {
} }
} }
fn comm_info_from_spawned_child(child: &pod_store::WorkerSpawnedChild) -> CommRegistryInfo { fn comm_info_from_spawned_child(child: &session_store::WorkerSpawnedChild) -> CommRegistryInfo {
let scope_delegated = child let scope_delegated = child
.scope_delegated .scope_delegated
.iter() .iter()
@ -773,7 +773,7 @@ fn comm_info_from_spawned_child(child: &pod_store::WorkerSpawnedChild) -> CommRe
} }
async fn summarize_spawned_children( async fn summarize_spawned_children(
children: &[pod_store::WorkerSpawnedChild], children: &[session_store::WorkerSpawnedChild],
) -> SpawnedChildrenSummary { ) -> SpawnedChildrenSummary {
let mut summary = SpawnedChildrenSummary { let mut summary = SpawnedChildrenSummary {
count: children.len(), count: children.len(),
@ -832,8 +832,8 @@ async fn probe_socket(socket_path: &Path) -> LiveInfo {
fn lookup_segment_lock( fn lookup_segment_lock(
segment_id: SegmentId, segment_id: SegmentId,
) -> Result<Option<pod_registry::SegmentLockInfo>, pod_registry::ScopeLockError> { ) -> Result<Option<worker_allocation::SegmentLockInfo>, worker_allocation::ScopeLockError> {
pod_registry::lookup_segment(segment_id) worker_allocation::lookup_segment(segment_id)
} }
#[derive(Debug, Deserialize, JsonSchema)] #[derive(Debug, Deserialize, JsonSchema)]
@ -1061,9 +1061,11 @@ mod tests {
use std::sync::Mutex; use std::sync::Mutex;
use manifest::{Permission, ScopeRule}; use manifest::{Permission, ScopeRule};
use pod_store::{FsWorkerStore, WorkerSpawnedChild, WorkerSpawnedScopeRule, WorkerStoreError};
use protocol::stream::JsonLineWriter; use protocol::stream::JsonLineWriter;
use protocol::{Alert, AlertLevel, AlertSource}; use protocol::{Alert, AlertLevel, AlertSource};
use session_store::{
FsWorkerStore, WorkerSpawnedChild, WorkerSpawnedScopeRule, WorkerStoreError,
};
use session_store::{new_segment_id, new_session_id}; use session_store::{new_segment_id, new_session_id};
use tempfile::TempDir; use tempfile::TempDir;
use tokio::net::UnixListener; use tokio::net::UnixListener;
@ -1143,7 +1145,7 @@ mod tests {
child("child-pending", &pending_socket), child("child-pending", &pending_socket),
], ],
reclaimed_children: Vec::new(), reclaimed_children: Vec::new(),
peers: vec![pod_store::WorkerPeer { peers: vec![session_store::WorkerPeer {
worker_name: "peer".into(), worker_name: "peer".into(),
}], }],
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
@ -1209,7 +1211,7 @@ mod tests {
workspace_root: None, workspace_root: None,
spawned_children: Vec::new(), spawned_children: Vec::new(),
reclaimed_children: Vec::new(), reclaimed_children: Vec::new(),
peers: vec![pod_store::WorkerPeer { peers: vec![session_store::WorkerPeer {
worker_name: "parent".into(), worker_name: "parent".into(),
}], }],
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
@ -1317,7 +1319,7 @@ mod tests {
assert!(matches!(restore_plan, RestorePlan::Restore { .. })); assert!(matches!(restore_plan, RestorePlan::Restore { .. }));
let lock_socket = runtime_base.join("lock-owner.sock"); let lock_socket = runtime_base.join("lock-owner.sock");
let _guard = pod_registry::install_top_level( let _guard = worker_allocation::install_top_level(
"lock-owner".into(), "lock-owner".into(),
std::process::id(), std::process::id(),
lock_socket.clone(), lock_socket.clone(),
@ -1415,7 +1417,7 @@ mod tests {
workspace_root: None, workspace_root: None,
spawned_children: Vec::new(), spawned_children: Vec::new(),
reclaimed_children: Vec::new(), reclaimed_children: Vec::new(),
peers: vec![pod_store::WorkerPeer { peers: vec![session_store::WorkerPeer {
worker_name: "target".into(), worker_name: "target".into(),
}], }],
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
@ -1455,7 +1457,7 @@ mod tests {
workspace_root: None, workspace_root: None,
spawned_children: Vec::new(), spawned_children: Vec::new(),
reclaimed_children: Vec::new(), reclaimed_children: Vec::new(),
peers: vec![pod_store::WorkerPeer { peers: vec![session_store::WorkerPeer {
worker_name: "target".into(), worker_name: "target".into(),
}], }],
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
@ -1468,7 +1470,7 @@ mod tests {
workspace_root: None, workspace_root: None,
spawned_children: Vec::new(), spawned_children: Vec::new(),
reclaimed_children: Vec::new(), reclaimed_children: Vec::new(),
peers: vec![pod_store::WorkerPeer { peers: vec![session_store::WorkerPeer {
worker_name: "source".into(), worker_name: "source".into(),
}], }],
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
@ -1573,7 +1575,7 @@ mod tests {
workspace_root: None, workspace_root: None,
spawned_children: Vec::new(), spawned_children: Vec::new(),
reclaimed_children: Vec::new(), reclaimed_children: Vec::new(),
peers: vec![pod_store::WorkerPeer { peers: vec![session_store::WorkerPeer {
worker_name: "target".into(), worker_name: "target".into(),
}], }],
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
@ -1586,7 +1588,7 @@ mod tests {
workspace_root: None, workspace_root: None,
spawned_children: Vec::new(), spawned_children: Vec::new(),
reclaimed_children: Vec::new(), reclaimed_children: Vec::new(),
peers: vec![pod_store::WorkerPeer { peers: vec![session_store::WorkerPeer {
worker_name: "source".into(), worker_name: "source".into(),
}], }],
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,

View File

@ -9,7 +9,7 @@ use manifest::{
WorkerManifest, WorkerManifestConfig, paths, WorkerManifest, WorkerManifestConfig, paths,
plugin::{PluginDiscoveryOptions, resolve_plugin_config_for_startup}, plugin::{PluginDiscoveryOptions, resolve_plugin_config_for_startup},
}; };
use pod_store::{CombinedStore, FsWorkerStore, WorkerMetadataStore}; use session_store::{CombinedStore, FsWorkerStore, WorkerMetadataStore};
use session_store::{FsStore, SegmentId, Store}; use session_store::{FsStore, SegmentId, Store};
use ticket::config::TicketRole; use ticket::config::TicketRole;
@ -85,7 +85,7 @@ struct Cli {
/// Restore a Worker from an existing session. The Worker re-uses the /// Restore a Worker from an existing session. The Worker re-uses the
/// given session id and appends new turns to the same jsonl; /// given session id and appends new turns to the same jsonl;
/// concurrent writers are prevented by the pod-registry. /// concurrent writers are prevented by the worker-allocation.
/// Mutually exclusive with `--adopt` (spawned children always start /// Mutually exclusive with `--adopt` (spawned children always start
/// fresh). /// fresh).
#[arg(long, value_name = "UUID", conflicts_with_all = ["adopt"])] #[arg(long, value_name = "UUID", conflicts_with_all = ["adopt"])]
@ -484,21 +484,21 @@ async fn run_cli_inner(cli: Cli) -> ExitCode {
return ExitCode::FAILURE; return ExitCode::FAILURE;
} }
}; };
let pod_store_dir = match paths::data_dir() { let worker_metadata_dir = match paths::data_dir() {
Some(data_dir) => data_dir.join("pods"), Some(data_dir) => data_dir.join("workers"),
None => store_dir None => store_dir
.parent() .parent()
.map(|parent| parent.join("pods")) .map(|parent| parent.join("workers"))
.unwrap_or_else(|| PathBuf::from("workers")), .unwrap_or_else(|| PathBuf::from("workers")),
}; };
let pod_store = match FsWorkerStore::new(&pod_store_dir) { let worker_metadata_store = match FsWorkerStore::new(&worker_metadata_dir) {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
eprintln!("error: failed to initialize worker store at {pod_store_dir:?}: {e}"); eprintln!("error: failed to initialize worker store at {worker_metadata_dir:?}: {e}");
return ExitCode::FAILURE; return ExitCode::FAILURE;
} }
}; };
let store = CombinedStore::new(session_store, pod_store); let store = CombinedStore::new(session_store, worker_metadata_store);
let mut worker = if cli.adopt { let mut worker = if cli.adopt {
let callback = match cli.callback.clone() { let callback = match cli.callback.clone() {

View File

@ -10,7 +10,7 @@
//! parent's notification buffer. Control-plane-only variants may still have //! parent's notification buffer. Control-plane-only variants may still have
//! a renderer for diagnostics, but receive-side classification keeps them //! a renderer for diagnostics, but receive-side classification keeps them
//! out of LLM history/context. //! out of LLM history/context.
//! - **Apply side effects** on the parent (registry / pod-registry //! - **Apply side effects** on the parent (registry / worker-allocation
//! updates) so that the receive path is idempotent and tolerant of //! updates) so that the receive path is idempotent and tolerant of
//! out-of-order delivery. //! out-of-order delivery.
//! //!

View File

@ -1,2 +1,2 @@
pub mod dir; pub mod dir;
pub use ::pod_registry; pub mod worker_allocation;

View File

@ -0,0 +1,29 @@
//! Process-local Worker allocation table used only for scope ownership checks.
//!
//! This module is intentionally not a runtime identity store. Runtime Worker
//! identity, creation and durable persistence remain owned by worker-runtime
//! fs-store plus its execution backend mapping; this table coordinates
//! in-process scope delegation while a Worker is running.
mod conflict;
mod error;
mod lifecycle;
mod mutate;
mod table;
#[cfg(test)]
mod test_util;
pub use conflict::{
ConflictOwner, find_conflict_owner, find_conflict_owners, is_within_effective_write,
};
pub use error::ScopeLockError;
pub use lifecycle::{
ScopeAllocationGuard, SegmentLockInfo, adopt_allocation, install_top_level,
install_top_level_with_deny, lookup_segment, update_segment,
};
pub use mutate::{
delegate_scope, reclaim_delegated_scope, reclaim_stale, reclaim_stale_with, register_worker,
register_worker_with_deny, release_worker,
};
pub use table::{Allocation, LockFile, LockFileGuard, default_allocation_path};

View File

@ -6,7 +6,7 @@
use manifest::{Permission, ScopeRule}; use manifest::{Permission, ScopeRule};
use crate::table::{Allocation, LockFile}; use super::table::{Allocation, LockFile};
/// Whether `a` and `b` claim any overlapping concrete path. /// Whether `a` and `b` claim any overlapping concrete path.
/// ///
@ -156,9 +156,11 @@ fn find_conflict_in_subtree(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::super::test_util::*;
use super::super::{
ScopeLockError, delegate_scope, register_worker, register_worker_with_deny,
};
use super::*; use super::*;
use crate::test_util::*;
use crate::{ScopeLockError, delegate_scope, register_pod, register_worker_with_deny};
use tempfile::TempDir; use tempfile::TempDir;
#[test] #[test]

View File

@ -1,4 +1,4 @@
//! Error type for mutating pod-registry operations. //! Error type for mutating Worker allocation operations.
use std::io; use std::io;
use std::path::PathBuf; use std::path::PathBuf;
@ -6,14 +6,14 @@ use std::path::PathBuf;
use manifest::{ScopeError, ScopeRule}; use manifest::{ScopeError, ScopeRule};
use session_store::SegmentId; use session_store::SegmentId;
/// Errors raised by the mutating pod-registry operations. /// Errors raised by the mutating Worker allocation operations.
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum ScopeLockError { pub enum ScopeLockError {
#[error("I/O error on workers.json: {0}")] #[error("I/O error on workers.json: {0}")]
Io(#[from] io::Error), Io(#[from] io::Error),
#[error("pod name `{0}` is already registered")] #[error("worker `{0}` is already allocated")]
DuplicateWorkerName(String), DuplicateWorkerName(String),
#[error("requested scope `{}` conflicts with pod `{competitor}` rule `{}`", .rule.target.display(), .competitor_rule.target.display())] #[error("requested scope `{}` conflicts with worker allocation `{competitor}` rule `{}`", .rule.target.display(), .competitor_rule.target.display())]
WriteConflict { WriteConflict {
competitor: String, competitor: String,
rule: ScopeRule, rule: ScopeRule,
@ -26,10 +26,10 @@ pub enum ScopeLockError {
NotSubset { spawner: String, rule: ScopeRule }, NotSubset { spawner: String, rule: ScopeRule },
#[error("invalid delegation scope: {source}")] #[error("invalid delegation scope: {source}")]
InvalidScope { source: ScopeError }, InvalidScope { source: ScopeError },
#[error("pod `{0}` is not registered")] #[error("worker `{0}` is not allocated")]
UnknownWorker(String), UnknownWorker(String),
#[error( #[error(
"session {segment_id} is already held by pod `{worker_name}` at {}", "session {segment_id} is already allocated to worker `{worker_name}` at {}",
.socket.display() .socket.display()
)] )]
SegmentConflict { SegmentConflict {

View File

@ -1,5 +1,5 @@
//! Owned-allocation guards and the high-level entry points that open //! Owned-allocation guards and the high-level entry points that open
//! the default registry path, mutate it, and return a guard that cleans //! the default worker allocation path, mutate it, and return a guard that cleans
//! up on drop. //! up on drop.
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@ -7,9 +7,9 @@ use std::path::{Path, PathBuf};
use manifest::ScopeRule; use manifest::ScopeRule;
use session_store::SegmentId; use session_store::SegmentId;
use crate::error::ScopeLockError; use super::error::ScopeLockError;
use crate::mutate::release_worker; use super::mutate::release_worker;
use crate::table::{LockFileGuard, default_registry_path}; use super::table::{LockFileGuard, default_allocation_path};
/// Owned allocation: on drop, opens the lock file and releases this /// Owned allocation: on drop, opens the lock file and releases this
/// Worker's entry. The guard keeps only the name + lock-file path; it /// Worker's entry. The guard keeps only the name + lock-file path; it
@ -68,9 +68,9 @@ pub fn install_top_level_with_deny(
scope_deny: Vec<ScopeRule>, scope_deny: Vec<ScopeRule>,
segment_id: SegmentId, segment_id: SegmentId,
) -> Result<ScopeAllocationGuard, ScopeLockError> { ) -> Result<ScopeAllocationGuard, ScopeLockError> {
let lock_path = default_registry_path()?; let lock_path = default_allocation_path()?;
let mut guard = LockFileGuard::open(&lock_path)?; let mut guard = LockFileGuard::open(&lock_path)?;
crate::mutate::register_worker_with_deny( super::mutate::register_worker_with_deny(
&mut guard, &mut guard,
worker_name.clone(), worker_name.clone(),
pid, pid,
@ -99,7 +99,7 @@ pub fn adopt_allocation(
new_pid: u32, new_pid: u32,
segment_id: SegmentId, segment_id: SegmentId,
) -> Result<ScopeAllocationGuard, ScopeLockError> { ) -> Result<ScopeAllocationGuard, ScopeLockError> {
let lock_path = default_registry_path()?; let lock_path = default_allocation_path()?;
let mut guard = LockFileGuard::open(&lock_path)?; let mut guard = LockFileGuard::open(&lock_path)?;
let alloc = guard let alloc = guard
.data_mut() .data_mut()
@ -134,7 +134,7 @@ pub fn adopt_allocation(
/// guard, so the segment_id collision check is atomic with the /// guard, so the segment_id collision check is atomic with the
/// rewrite. /// rewrite.
pub fn update_segment(worker_name: &str, new_segment_id: SegmentId) -> Result<(), ScopeLockError> { pub fn update_segment(worker_name: &str, new_segment_id: SegmentId) -> Result<(), ScopeLockError> {
let lock_path = default_registry_path()?; let lock_path = default_allocation_path()?;
let mut guard = LockFileGuard::open(&lock_path)?; let mut guard = LockFileGuard::open(&lock_path)?;
if let Some(other) = guard.data().find_by_segment(new_segment_id) { if let Some(other) = guard.data().find_by_segment(new_segment_id) {
if other.worker_name != worker_name { if other.worker_name != worker_name {
@ -169,9 +169,9 @@ pub struct SegmentLockInfo {
/// Used by `Worker::restore_from_manifest` to refuse a resume that would /// Used by `Worker::restore_from_manifest` to refuse a resume that would
/// race a live writer on the same source session. /// race a live writer on the same source session.
pub fn lookup_segment(segment_id: SegmentId) -> Result<Option<SegmentLockInfo>, ScopeLockError> { pub fn lookup_segment(segment_id: SegmentId) -> Result<Option<SegmentLockInfo>, ScopeLockError> {
let lock_path = default_registry_path()?; let lock_path = default_allocation_path()?;
let mut guard = LockFileGuard::open(&lock_path)?; let mut guard = LockFileGuard::open(&lock_path)?;
crate::mutate::reclaim_stale(&mut guard); super::mutate::reclaim_stale(&mut guard);
Ok(guard Ok(guard
.data() .data()
.find_by_segment(segment_id) .find_by_segment(segment_id)
@ -184,9 +184,9 @@ pub fn lookup_segment(segment_id: SegmentId) -> Result<Option<SegmentLockInfo>,
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::super::table::Allocation;
use super::super::test_util::*;
use super::*; use super::*;
use crate::table::Allocation;
use crate::test_util::*;
use tempfile::TempDir; use tempfile::TempDir;
/// Mimic what the spawner does before the child comes up: push an /// Mimic what the spawner does before the child comes up: push an

View File

@ -1,5 +1,5 @@
//! Mutating operations over the allocation table. All of these expect //! Mutating operations over the allocation table. All of these expect
//! the caller to hold a [`LockFileGuard`] for the registry's lock file. //! the caller to hold a [`LockFileGuard`] for the worker allocation's lock file.
use std::io; use std::io;
use std::path::PathBuf; use std::path::PathBuf;
@ -7,9 +7,9 @@ use std::path::PathBuf;
use manifest::{DelegationScope, Permission, ScopeRule}; use manifest::{DelegationScope, Permission, ScopeRule};
use session_store::SegmentId; use session_store::SegmentId;
use crate::conflict::{find_conflict_owner, find_conflict_owners}; use super::conflict::{find_conflict_owner, find_conflict_owners};
use crate::error::ScopeLockError; use super::error::ScopeLockError;
use crate::table::{Allocation, LockFileGuard}; use super::table::{Allocation, LockFileGuard};
/// Register a top-level Worker (started directly by a human, no /// Register a top-level Worker (started directly by a human, no
/// delegation parent). Reclaims stale entries before checking /// delegation parent). Reclaims stale entries before checking
@ -46,7 +46,7 @@ pub fn register_worker(
/// competitor.rule), not relational — it does not verify that the /// competitor.rule), not relational — it does not verify that the
/// competitor actually descends from this Worker's prior delegations. /// competitor actually descends from this Worker's prior delegations.
/// In practice this is safe because the canonical restore caller derives /// In practice this is safe because the canonical restore caller derives
/// `scope_deny` from outstanding `pod-store` child delegations, so any /// `scope_deny` from outstanding child worker metadata delegations, so any
/// covered competitor is expected to be a descendant of the original /// covered competitor is expected to be a descendant of the original
/// allocation. Direct callers must uphold the same invariant. /// allocation. Direct callers must uphold the same invariant.
pub fn register_worker_with_deny( pub fn register_worker_with_deny(
@ -79,7 +79,7 @@ pub fn register_worker_with_deny(
scope_deny scope_deny
.iter() .iter()
.filter(|r| r.permission == Permission::Write) .filter(|r| r.permission == Permission::Write)
.any(|deny| crate::conflict::covers_fully(deny, &owner.rule)) .any(|deny| super::conflict::covers_fully(deny, &owner.rule))
}); });
if all_denied { if all_denied {
continue; continue;
@ -286,9 +286,9 @@ fn pid_alive(pid: u32) -> bool {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::super::is_within_effective_write;
use super::super::test_util::*;
use super::*; use super::*;
use crate::is_within_effective_write;
use crate::test_util::*;
use tempfile::TempDir; use tempfile::TempDir;
#[test] #[test]

View File

@ -49,7 +49,7 @@ pub struct Allocation {
/// a top-level Worker started directly by a human. /// a top-level Worker started directly by a human.
pub delegated_from: Option<String>, pub delegated_from: Option<String>,
/// Segment ID this Worker is currently writing to. `None` means this /// Segment ID this Worker is currently writing to. `None` means this
/// is a pre-reservation made by a spawner via [`crate::delegate_scope`] /// is a pre-reservation made by a spawner via [`super::super::delegate_scope`]
/// before the child has come up; the child fills it in at /// before the child has come up; the child fills it in at
/// [`crate::adopt_allocation`] time. /// [`crate::adopt_allocation`] time.
#[serde(default)] #[serde(default)]
@ -79,11 +79,11 @@ impl LockFile {
} }
/// Default on-disk path: `<runtime_dir>/workers.json` resolved via /// Default on-disk path: `<runtime_dir>/workers.json` resolved via
/// [`manifest::paths::pod_registry_path`]. Tests should point this /// [`manifest::paths::worker_allocation_path`]. Tests should point this
/// elsewhere by setting `YOI_HOME` or `YOI_RUNTIME_DIR` to a /// elsewhere by setting `YOI_HOME` or `YOI_RUNTIME_DIR` to a
/// tempdir. /// tempdir.
pub fn default_registry_path() -> io::Result<PathBuf> { pub fn default_allocation_path() -> io::Result<PathBuf> {
paths::pod_registry_path().ok_or_else(|| { paths::worker_allocation_path().ok_or_else(|| {
io::Error::new( io::Error::new(
io::ErrorKind::NotFound, io::ErrorKind::NotFound,
"could not resolve workers.json path (no YOI_HOME / \ "could not resolve workers.json path (no YOI_HOME / \
@ -137,7 +137,7 @@ impl LockFileGuard {
return Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::TimedOut, io::ErrorKind::TimedOut,
format!( format!(
"timed out waiting for pod registry lock `{}`", "timed out waiting for worker allocation lock `{}`",
path.display() path.display()
), ),
)); ));
@ -149,7 +149,7 @@ impl LockFileGuard {
return Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::TimedOut, io::ErrorKind::TimedOut,
format!( format!(
"timed out waiting for pod registry lock `{}`", "timed out waiting for worker allocation lock `{}`",
path.display() path.display()
), ),
)); ));
@ -211,9 +211,9 @@ impl Drop for LockFileGuard {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::super::register_worker;
use super::super::test_util::*;
use super::*; use super::*;
use crate::register_pod;
use crate::test_util::*;
use tempfile::TempDir; use tempfile::TempDir;
#[test] #[test]
@ -277,7 +277,7 @@ mod tests {
sid(), sid(),
) )
.unwrap(); .unwrap();
crate::delegate_scope( super::super::delegate_scope(
&mut g, &mut g,
"parent", "parent",
"child".into(), "child".into(),

View File

@ -1,6 +1,6 @@
//! Shared test helpers for the pod-registry crate. //! Shared test helpers for the pod-worker allocation crate.
//! //!
//! Visible to all `#[cfg(test)]` modules under `crate::test_util::*`. //! Visible to all `#[cfg(test)]` modules under `super::test_util::*`.
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{LazyLock, Mutex, MutexGuard}; use std::sync::{LazyLock, Mutex, MutexGuard};
@ -8,7 +8,7 @@ use std::sync::{LazyLock, Mutex, MutexGuard};
use manifest::{DelegationScope, Permission, ScopeConfig, ScopeRule}; use manifest::{DelegationScope, Permission, ScopeConfig, ScopeRule};
use session_store::SegmentId; use session_store::SegmentId;
use crate::table::LockFileGuard; use super::table::LockFileGuard;
pub(crate) fn sid() -> SegmentId { pub(crate) fn sid() -> SegmentId {
session_store::new_segment_id() session_store::new_segment_id()
@ -17,7 +17,7 @@ pub(crate) fn sid() -> SegmentId {
/// Serialises tests that mutate runtime-dir env vars. The test /// Serialises tests that mutate runtime-dir env vars. The test
/// harness runs tests on multiple threads inside a single process, /// harness runs tests on multiple threads inside a single process,
/// so env-var writes from one test would otherwise leak into a /// so env-var writes from one test would otherwise leak into a
/// parallel test's `default_registry_path()` lookup. /// parallel test's `default_allocation_path()` lookup.
pub(crate) static ENV_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(())); pub(crate) static ENV_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
/// Sandbox `YOI_RUNTIME_DIR` to a tempdir for the duration of /// Sandbox `YOI_RUNTIME_DIR` to a tempdir for the duration of

View File

@ -16,9 +16,9 @@ use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use manifest::paths; use manifest::paths;
use pod_store::{CombinedStore, FsWorkerStore};
use protocol::{Method, Segment, WorkerStatus}; use protocol::{Method, Segment, WorkerStatus};
use session_store::FsStore; use session_store::FsStore;
use session_store::{CombinedStore, FsWorkerStore};
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use worker_runtime::execution::{ use worker_runtime::execution::{
@ -49,7 +49,7 @@ pub struct ProfileRuntimeWorkerFactory {
workspace_root: PathBuf, workspace_root: PathBuf,
cwd: PathBuf, cwd: PathBuf,
store_dir: Option<PathBuf>, store_dir: Option<PathBuf>,
pod_store_dir: Option<PathBuf>, worker_metadata_dir: Option<PathBuf>,
profile: Option<String>, profile: Option<String>,
runtime_base_dir: Option<PathBuf>, runtime_base_dir: Option<PathBuf>,
} }
@ -61,7 +61,7 @@ impl ProfileRuntimeWorkerFactory {
cwd: workspace_root.clone(), cwd: workspace_root.clone(),
workspace_root, workspace_root,
store_dir: None, store_dir: None,
pod_store_dir: None, worker_metadata_dir: None,
profile: None, profile: None,
runtime_base_dir: None, runtime_base_dir: None,
} }
@ -77,8 +77,8 @@ impl ProfileRuntimeWorkerFactory {
self self
} }
pub fn with_pod_store_dir(mut self, pod_store_dir: impl Into<PathBuf>) -> Self { pub fn with_worker_metadata_dir(mut self, worker_metadata_dir: impl Into<PathBuf>) -> Self {
self.pod_store_dir = Some(pod_store_dir.into()); self.worker_metadata_dir = Some(worker_metadata_dir.into());
self self
} }
@ -104,11 +104,11 @@ impl ProfileRuntimeWorkerFactory {
}) })
} }
fn pod_store_dir(&self, store_dir: &std::path::Path) -> PathBuf { fn worker_metadata_dir(&self, store_dir: &std::path::Path) -> PathBuf {
self.pod_store_dir self.worker_metadata_dir
.clone() .clone()
.or_else(|| paths::data_dir().map(|data_dir| data_dir.join("pods"))) .or_else(|| paths::data_dir().map(|data_dir| data_dir.join("workers")))
.or_else(|| store_dir.parent().map(|parent| parent.join("pods"))) .or_else(|| store_dir.parent().map(|parent| parent.join("workers")))
.unwrap_or_else(|| PathBuf::from("workers")) .unwrap_or_else(|| PathBuf::from("workers"))
} }
@ -174,14 +174,14 @@ impl RuntimeWorkerFactory for ProfileRuntimeWorkerFactory {
store_dir.display() store_dir.display()
) )
})?; })?;
let pod_store_dir = self.pod_store_dir(&store_dir); let worker_metadata_dir = self.worker_metadata_dir(&store_dir);
let pod_store = FsWorkerStore::new(&pod_store_dir).map_err(|err| { let worker_metadata_store = FsWorkerStore::new(&worker_metadata_dir).map_err(|err| {
format!( format!(
"failed to initialize worker metadata store at {}: {err}", "failed to initialize worker metadata store at {}: {err}",
pod_store_dir.display() worker_metadata_dir.display()
) )
})?; })?;
let store = CombinedStore::new(session_store, pod_store); let store = CombinedStore::new(session_store, worker_metadata_store);
let worker = Worker::from_manifest_with_context( let worker = Worker::from_manifest_with_context(
manifest, manifest,
@ -558,7 +558,7 @@ mod tests {
runtime_base: PathBuf, runtime_base: PathBuf,
cwd: PathBuf, cwd: PathBuf,
store_dir: PathBuf, store_dir: PathBuf,
pod_store_dir: PathBuf, worker_metadata_dir: PathBuf,
} }
#[async_trait] #[async_trait]
@ -588,7 +588,7 @@ mod tests {
.map_err(|err| err.to_string())?; .map_err(|err| err.to_string())?;
let store = CombinedStore::new( let store = CombinedStore::new(
FsStore::new(&self.store_dir).map_err(|err| err.to_string())?, FsStore::new(&self.store_dir).map_err(|err| err.to_string())?,
FsWorkerStore::new(&self.pod_store_dir).map_err(|err| err.to_string())?, FsWorkerStore::new(&self.worker_metadata_dir).map_err(|err| err.to_string())?,
); );
let scope = Scope::writable(&self.cwd).map_err(|err| err.to_string())?; let scope = Scope::writable(&self.cwd).map_err(|err| err.to_string())?;
let worker = Worker::new( let worker = Worker::new(
@ -662,7 +662,7 @@ mod tests {
runtime_base: runtime_base.path().to_path_buf(), runtime_base: runtime_base.path().to_path_buf(),
cwd: cwd.path().to_path_buf(), cwd: cwd.path().to_path_buf(),
store_dir: store.path().join("sessions"), store_dir: store.path().join("sessions"),
pod_store_dir: store.path().join("pods"), worker_metadata_dir: store.path().join("workers"),
}; };
let backend = WorkerRuntimeExecutionBackend::new(factory).unwrap(); let backend = WorkerRuntimeExecutionBackend::new(factory).unwrap();
let runtime = EmbeddedRuntime::with_execution_backend( let runtime = EmbeddedRuntime::with_execution_backend(

View File

@ -21,7 +21,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use manifest::{Permission, ScopeRule, SharedScope}; use manifest::{Permission, ScopeRule, SharedScope};
use pod_store::{ use session_store::{
WorkerMetadataStore, WorkerReclaimedChild, WorkerSpawnedChild, WorkerSpawnedScopeRule, WorkerMetadataStore, WorkerReclaimedChild, WorkerSpawnedChild, WorkerSpawnedScopeRule,
WorkerStoreError, WorkerStoreError,
}; };
@ -30,7 +30,7 @@ use tokio::sync::Mutex;
use tracing::warn; use tracing::warn;
use crate::runtime::dir::{RuntimeDir, SpawnedWorkerRecord}; use crate::runtime::dir::{RuntimeDir, SpawnedWorkerRecord};
use crate::runtime::pod_registry; use crate::runtime::worker_allocation;
type RegistryStateWriter = Arc<dyn Fn(&[SpawnedWorkerRecord]) -> io::Result<()> + Send + Sync>; type RegistryStateWriter = Arc<dyn Fn(&[SpawnedWorkerRecord]) -> io::Result<()> + Send + Sync>;
type RegistryReclaimWriter = Arc<dyn Fn(&SpawnedWorkerRecord) -> io::Result<()> + Send + Sync>; type RegistryReclaimWriter = Arc<dyn Fn(&SpawnedWorkerRecord) -> io::Result<()> + Send + Sync>;
@ -339,11 +339,11 @@ fn reclaim_record(
.cloned() .cloned()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let lock_path = pod_registry::default_registry_path() let lock_path = worker_allocation::default_allocation_path()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
let mut guard = pod_registry::LockFileGuard::open(&lock_path) let mut guard = worker_allocation::LockFileGuard::open(&lock_path)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
pod_registry::reclaim_delegated_scope( worker_allocation::reclaim_delegated_scope(
&mut guard, &mut guard,
parent_name, parent_name,
&record.worker_name, &record.worker_name,
@ -361,12 +361,12 @@ fn reclaim_record(
} }
fn release_child_allocation(worker_name: &str) -> io::Result<()> { fn release_child_allocation(worker_name: &str) -> io::Result<()> {
let lock_path = pod_registry::default_registry_path() let lock_path = worker_allocation::default_allocation_path()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
let mut guard = pod_registry::LockFileGuard::open(&lock_path) let mut guard = worker_allocation::LockFileGuard::open(&lock_path)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
match pod_registry::release_worker(&mut guard, worker_name) { match worker_allocation::release_worker(&mut guard, worker_name) {
Ok(()) | Err(pod_registry::ScopeLockError::UnknownWorker(_)) => Ok(()), Ok(()) | Err(worker_allocation::ScopeLockError::UnknownWorker(_)) => Ok(()),
Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)), Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)),
} }
} }

View File

@ -1,9 +1,9 @@
//! `SpawnWorker` tool — launch a new Worker process as a child of this one. //! `SpawnWorker` tool — launch a new Worker process as a child of this one.
//! //!
//! Wires pod-registry delegation, child manifest-config construction, subprocess //! Wires worker-allocation delegation, child manifest-config construction, subprocess
//! launch, and socket handoff into a single `Tool` implementation. When //! launch, and socket handoff into a single `Tool` implementation. When
//! the LLM calls `SpawnWorker`, a fresh Worker runtime command is exec'd in its own //! the LLM calls `SpawnWorker`, a fresh Worker runtime command is exec'd in its own
//! process group, the pod-registry is updated atomically, and the child's //! process group, the worker-allocation is updated atomically, and the child's
//! first turn is kicked off by handing its socket a `Method::Run`. //! first turn is kicked off by handing its socket a `Method::Run`.
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@ -29,7 +29,7 @@ use tokio::time::sleep;
use crate::ipc::event; use crate::ipc::event;
use crate::prompt::catalog::PromptCatalog; use crate::prompt::catalog::PromptCatalog;
use crate::runtime::dir::SpawnedWorkerRecord; use crate::runtime::dir::SpawnedWorkerRecord;
use crate::runtime::pod_registry::{self, LockFileGuard, ScopeLockError}; use crate::runtime::worker_allocation::{self, LockFileGuard, ScopeLockError};
use crate::spawn::comm_tools::{SendRunError, send_run_and_confirm}; use crate::spawn::comm_tools::{SendRunError, send_run_and_confirm};
use crate::spawn::registry::SpawnedWorkerRegistry; use crate::spawn::registry::SpawnedWorkerRegistry;
use protocol::WorkerEvent; use protocol::WorkerEvent;
@ -216,7 +216,7 @@ fn parse_spawn_profile_selector(raw: Option<&str>) -> Result<SpawnProfileSelecto
/// controller once per Worker lifetime. /// controller once per Worker lifetime.
pub struct SpawnWorkerTool { pub struct SpawnWorkerTool {
/// Spawner's own worker name — becomes the spawned Worker's /// Spawner's own worker name — becomes the spawned Worker's
/// `delegated_from` in the pod-registry. /// `delegated_from` in the worker-allocation.
spawner_name: String, spawner_name: String,
/// Path to the spawner's Unix socket. Handed to the child via /// Path to the spawner's Unix socket. Handed to the child via
/// `--callback` so its `WorkerEvent` callbacks have somewhere to land. /// `--callback` so its `WorkerEvent` callbacks have somewhere to land.
@ -254,7 +254,7 @@ pub struct SpawnWorkerTool {
/// `Permission::Write` rules in the delegated scope are revoked /// `Permission::Write` rules in the delegated scope are revoked
/// from the spawner's in-memory view (a `deny(Write, target)` is /// from the spawner's in-memory view (a `deny(Write, target)` is
/// pushed on top, downgrading the spawner's effective access on /// pushed on top, downgrading the spawner's effective access on
/// those paths to `Read`). Mirrors the pod-registry's /// those paths to `Read`). Mirrors the worker-allocation's
/// `effective_write` semantics: Write is the only permission /// `effective_write` semantics: Write is the only permission
/// tracked across Workers, so revocation only touches Write. /// tracked across Workers, so revocation only touches Write.
spawner_scope: SharedScope, spawner_scope: SharedScope,
@ -337,15 +337,15 @@ impl Tool for SpawnWorkerTool {
.map_err(|e| ToolError::InvalidArgument(format!("{e}")))?; .map_err(|e| ToolError::InvalidArgument(format!("{e}")))?;
let predicted_socket = self.runtime_base.join(&input.name).join("sock"); let predicted_socket = self.runtime_base.join(&input.name).join("sock");
let lock_path = pod_registry::default_registry_path() let lock_path = worker_allocation::default_allocation_path()
.map_err(|e| ToolError::ExecutionFailed(format!("pod-registry path: {e}")))?; .map_err(|e| ToolError::ExecutionFailed(format!("worker-allocation path: {e}")))?;
// Reserve the allocation up front. Spawner's pid is a live // Reserve the allocation up front. Spawner's pid is a live
// placeholder; the child will rewrite it via `adopt_allocation`. // placeholder; the child will rewrite it via `adopt_allocation`.
{ {
let mut guard = LockFileGuard::open(&lock_path) let mut guard = LockFileGuard::open(&lock_path)
.map_err(|e| ToolError::ExecutionFailed(format!("pod-registry open: {e}")))?; .map_err(|e| ToolError::ExecutionFailed(format!("worker-allocation open: {e}")))?;
pod_registry::delegate_scope( worker_allocation::delegate_scope(
&mut guard, &mut guard,
&self.spawner_name, &self.spawner_name,
input.name.clone(), input.name.clone(),
@ -354,7 +354,7 @@ impl Tool for SpawnWorkerTool {
scope_allow.clone(), scope_allow.clone(),
&self.delegation_scope, &self.delegation_scope,
) )
.map_err(pod_registry_err_to_tool)?; .map_err(worker_allocation_err_to_tool)?;
} }
// `start_outcome` covers steps that happen before the child is // `start_outcome` covers steps that happen before the child is
@ -527,7 +527,7 @@ impl SpawnWorkerTool {
fn release_reservation(&self, lock_path: &Path, worker_name: &str) { fn release_reservation(&self, lock_path: &Path, worker_name: &str) {
if let Ok(mut g) = LockFileGuard::open(lock_path) { if let Ok(mut g) = LockFileGuard::open(lock_path) {
let _ = pod_registry::release_worker(&mut g, worker_name); let _ = worker_allocation::release_worker(&mut g, worker_name);
} }
} }
} }
@ -864,7 +864,7 @@ fn spawn_delivery_error(worker_name: &str, err: SendRunError) -> ToolError {
} }
} }
fn pod_registry_err_to_tool(e: ScopeLockError) -> ToolError { fn worker_allocation_err_to_tool(e: ScopeLockError) -> ToolError {
match e { match e {
ScopeLockError::NotSubset { .. } ScopeLockError::NotSubset { .. }
| ScopeLockError::WriteConflict { .. } | ScopeLockError::WriteConflict { .. }

View File

@ -10,7 +10,7 @@ use tracing::{debug, warn};
use crate::discovery::{WeakNotifyDelivery, WorkerDiscovery}; use crate::discovery::{WeakNotifyDelivery, WorkerDiscovery};
use crate::hook::{Hook, HookPostToolAction, PostToolCall, ToolResultSummary}; use crate::hook::{Hook, HookPostToolAction, PostToolCall, ToolResultSummary};
use crate::prompt::catalog::{PromptCatalog, WorkerPrompt}; use crate::prompt::catalog::{PromptCatalog, WorkerPrompt};
use pod_store::WorkerMetadataStore; use session_store::WorkerMetadataStore;
const MAX_TITLE_CHARS: usize = 96; const MAX_TITLE_CHARS: usize = 96;
const MAX_SUMMARY_CHARS: usize = 160; const MAX_SUMMARY_CHARS: usize = 160;
@ -251,11 +251,11 @@ mod tests {
use crate::runtime::dir::RuntimeDir; use crate::runtime::dir::RuntimeDir;
use crate::spawn::registry::SpawnedWorkerRegistry; use crate::spawn::registry::SpawnedWorkerRegistry;
use llm_engine::tool::ToolOutput; use llm_engine::tool::ToolOutput;
use pod_store::FsWorkerStore;
use pod_store::WorkerMetadata;
use protocol::stream::{JsonLineReader, JsonLineWriter}; use protocol::stream::{JsonLineReader, JsonLineWriter};
use protocol::{Event, Method}; use protocol::{Event, Method};
use serde_json::json; use serde_json::json;
use session_store::FsWorkerStore;
use session_store::WorkerMetadata;
use std::sync::Arc; use std::sync::Arc;
use tempfile::tempdir; use tempfile::tempdir;
use ticket::NewTicket; use ticket::NewTicket;

View File

@ -10,13 +10,13 @@ use llm_engine::llm_client::client::LlmClient;
use llm_engine::llm_client::types::Role; use llm_engine::llm_client::types::Role;
use llm_engine::state::Mutable; use llm_engine::state::Mutable;
use llm_engine::{Engine, EngineError, EngineResult, ToolOutputLimits, UsageRecord}; use llm_engine::{Engine, EngineError, EngineResult, ToolOutputLimits, UsageRecord};
use pod_store::{
WorkerActiveSegmentRef, WorkerMetadata, WorkerMetadataStore, WorkerReclaimedChild,
WorkerSpawnedChild, WorkerSpawnedScopeRule, WorkerStoreError,
};
use session_store::{ use session_store::{
LogEntry, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log, to_logged, LogEntry, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log, to_logged,
}; };
use session_store::{
WorkerActiveSegmentRef, WorkerMetadata, WorkerMetadataStore, WorkerReclaimedChild,
WorkerSpawnedChild, WorkerSpawnedScopeRule, WorkerStoreError,
};
use tracing::{info, warn}; use tracing::{info, warn};
use crate::segment_log_sink::SegmentLogSink; use crate::segment_log_sink::SegmentLogSink;
@ -44,7 +44,7 @@ use crate::prompt::catalog::{CatalogError, PromptCatalog};
use crate::prompt::loader::PromptLoader; use crate::prompt::loader::PromptLoader;
use crate::prompt::system::{SystemPromptContext, SystemPromptError, SystemPromptTemplate}; use crate::prompt::system::{SystemPromptContext, SystemPromptError, SystemPromptTemplate};
use crate::runtime::dir; use crate::runtime::dir;
use crate::runtime::pod_registry::{self, ScopeAllocationGuard, ScopeLockError}; use crate::runtime::worker_allocation::{self, ScopeAllocationGuard, ScopeLockError};
use crate::workflow::WorkflowResolveError; use crate::workflow::WorkflowResolveError;
#[cfg(test)] #[cfg(test)]
use async_trait::async_trait; use async_trait::async_trait;
@ -795,7 +795,7 @@ impl<C: LlmClient, St: Store> Worker<C, St> {
/// Strip `revoke` rules from the Worker's runtime scope by adding /// Strip `revoke` rules from the Worker's runtime scope by adding
/// matching deny rules. A `Permission::Write` revoke caps effective /// matching deny rules. A `Permission::Write` revoke caps effective
/// access at `Read` (mirroring the pod-registry `effective_write` /// access at `Read` (mirroring the worker-allocation `effective_write`
/// semantics — Write is the only permission tracked across Workers). /// semantics — Write is the only permission tracked across Workers).
/// A `Permission::Read` revoke removes access entirely. /// A `Permission::Read` revoke removes access entirely.
pub fn revoke_scope_rules( pub fn revoke_scope_rules(
@ -2086,7 +2086,7 @@ impl<C: LlmClient, St: Store> Worker<C, St> {
self.segment_state.set_entries_written(1); self.segment_state.set_entries_written(1);
self.sink.reset_with_initial(entry); self.sink.reset_with_initial(entry);
if self.scope_allocation.is_some() { if self.scope_allocation.is_some() {
pod_registry::update_segment(&self.manifest.worker.name, fork_segment_id)?; worker_allocation::update_segment(&self.manifest.worker.name, fork_segment_id)?;
} }
self.write_worker_metadata_active(SegmentLocation { self.write_worker_metadata_active(SegmentLocation {
session_id: loc.session_id, session_id: loc.session_id,
@ -2795,7 +2795,7 @@ impl<C: LlmClient, St: Store> Worker<C, St> {
// when no allocation is installed (e.g. compact under // when no allocation is installed (e.g. compact under
// `Worker::new` in tests). // `Worker::new` in tests).
if self.scope_allocation.is_some() { if self.scope_allocation.is_some() {
pod_registry::update_segment(&self.manifest.worker.name, new_segment_id)?; worker_allocation::update_segment(&self.manifest.worker.name, new_segment_id)?;
} }
self.write_worker_metadata_active(SegmentLocation { self.write_worker_metadata_active(SegmentLocation {
session_id: old_loc.session_id, session_id: old_loc.session_id,
@ -3847,19 +3847,19 @@ where
// Segment creation is deferred to the first run (see // Segment creation is deferred to the first run (see
// `ensure_segment_head`) so the SegmentStart entry can capture // `ensure_segment_head`) so the SegmentStart entry can capture
// the rendered system prompt, not the raw template source. The // the rendered system prompt, not the raw template source. The
// session_id + segment_id are allocated here so the pod-registry // session_id + segment_id are allocated here so the worker-allocation
// registration can record them from the start. // registration can record them from the start.
let session_id = session_store::new_session_id(); let session_id = session_store::new_session_id();
let segment_id = session_store::new_segment_id(); let segment_id = session_store::new_segment_id();
// Register this Worker in the machine-wide pod-registry // Register this Worker in the machine-wide worker-allocation
// before building anything else, so a spawn that conflicts on // before building anything else, so a spawn that conflicts on
// scope fails fast. // scope fails fast.
let socket_path = dir::default_base() let socket_path = dir::default_base()
.map_err(ScopeLockError::from)? .map_err(ScopeLockError::from)?
.join(&manifest.worker.name) .join(&manifest.worker.name)
.join("sock"); .join("sock");
let scope_allocation = pod_registry::install_top_level( let scope_allocation = worker_allocation::install_top_level(
manifest.worker.name.clone(), manifest.worker.name.clone(),
std::process::id(), std::process::id(),
socket_path, socket_path,
@ -3927,7 +3927,7 @@ where
/// ///
/// Behaves like [`Worker::from_manifest`] but claims the scope /// Behaves like [`Worker::from_manifest`] but claims the scope
/// allocation that the spawner pre-registered via /// allocation that the spawner pre-registered via
/// [`pod_registry::delegate_scope`], rather than installing a new /// [`worker_allocation::delegate_scope`], rather than installing a new
/// top-level entry. `callback_socket` carries the spawner's /// top-level entry. `callback_socket` carries the spawner's
/// Unix-socket path so the spawned Worker can send `Method::Notify` /// Unix-socket path so the spawned Worker can send `Method::Notify`
/// back to the spawner. /// back to the spawner.
@ -3971,7 +3971,7 @@ where
// fresh Session rather than joining the spawner's. // fresh Session rather than joining the spawner's.
let session_id = session_store::new_session_id(); let session_id = session_store::new_session_id();
let segment_id = session_store::new_segment_id(); let segment_id = session_store::new_segment_id();
let scope_allocation = pod_registry::adopt_allocation( let scope_allocation = worker_allocation::adopt_allocation(
manifest.worker.name.clone(), manifest.worker.name.clone(),
std::process::id(), std::process::id(),
segment_id, segment_id,
@ -4104,9 +4104,9 @@ where
/// reuses the same `segment_id` so subsequent turns append to the /// reuses the same `segment_id` so subsequent turns append to the
/// source jsonl as a continuation of the same conversation. /// source jsonl as a continuation of the same conversation.
/// ///
/// Concurrent writers are prevented by the pod-registry: /// Concurrent writers are prevented by the worker-allocation:
/// the registration carries `segment_id`, and this constructor /// the registration carries `segment_id`, and this constructor
/// refuses to start when `pod_registry::lookup_segment` already finds /// refuses to start when `worker_allocation::lookup_segment` already finds
/// a live Worker writing to `segment_id`. So there is no need to fork — /// a live Worker writing to `segment_id`. So there is no need to fork —
/// resume is "the same session, a different process owning it". /// resume is "the same session, a different process owning it".
/// ///
@ -4173,7 +4173,7 @@ where
.map_err(ScopeLockError::from)? .map_err(ScopeLockError::from)?
.join(&manifest.worker.name) .join(&manifest.worker.name)
.join("sock"); .join("sock");
let scope_allocation = pod_registry::install_top_level_with_deny( let scope_allocation = worker_allocation::install_top_level_with_deny(
manifest.worker.name.clone(), manifest.worker.name.clone(),
std::process::id(), std::process::id(),
socket_path, socket_path,
@ -4291,10 +4291,10 @@ where
let delegated_scope = spawned_child_scope_rules(&child); let delegated_scope = spawned_child_scope_rules(&child);
if !delegated_scope.is_empty() { if !delegated_scope.is_empty() {
let lock_path = let lock_path =
pod_registry::default_registry_path().map_err(ScopeLockError::from)?; worker_allocation::default_allocation_path().map_err(ScopeLockError::from)?;
let mut guard = let mut guard = worker_allocation::LockFileGuard::open(&lock_path)
pod_registry::LockFileGuard::open(&lock_path).map_err(ScopeLockError::from)?; .map_err(ScopeLockError::from)?;
pod_registry::reclaim_delegated_scope( worker_allocation::reclaim_delegated_scope(
&mut guard, &mut guard,
&worker_name, &worker_name,
&child.worker_name, &child.worker_name,
@ -5300,7 +5300,7 @@ mod worker_metadata_restore_manifest_tests {
#[test] #[test]
fn metadata_writer_persists_workspace_root_through_store_update() { fn metadata_writer_persists_workspace_root_through_store_update() {
let temp = tempfile::tempdir().unwrap(); let temp = tempfile::tempdir().unwrap();
let store = pod_store::FsWorkerStore::new(temp.path().join("pods")).unwrap(); let store = session_store::FsWorkerStore::new(temp.path().join("workers")).unwrap();
let workspace_root = temp.path().join("workspace-root"); let workspace_root = temp.path().join("workspace-root");
std::fs::create_dir_all(&workspace_root).unwrap(); std::fs::create_dir_all(&workspace_root).unwrap();
let writer = worker_metadata_writer_for_store(&store); let writer = worker_metadata_writer_for_store(&store);

View File

@ -16,8 +16,8 @@ use llm_engine::Engine;
use llm_engine::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; use llm_engine::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent};
use llm_engine::llm_client::types::Item; use llm_engine::llm_client::types::Item;
use llm_engine::llm_client::{ClientError, LlmClient, Request}; use llm_engine::llm_client::{ClientError, LlmClient, Request};
use pod_store::{CombinedStore, FsWorkerStore, WorkerMetadataStore};
use protocol::{Event, Method, RunResult}; use protocol::{Event, Method, RunResult};
use session_store::{CombinedStore, FsWorkerStore, WorkerMetadataStore};
use session_store::{FsStore, LogEntry, Store}; use session_store::{FsStore, LogEntry, Store};
use tokio::sync::broadcast; use tokio::sync::broadcast;

View File

@ -26,8 +26,8 @@ use llm_engine::llm_client::{ClientError, LlmClient, Request};
use memory::WorkspaceLayout; use memory::WorkspaceLayout;
use memory::extract::{ExtractedPayload, write_staging}; use memory::extract::{ExtractedPayload, write_staging};
use memory::schema::SourceRef; use memory::schema::SourceRef;
use pod_store::{CombinedStore, FsWorkerStore};
use session_store::FsStore; use session_store::FsStore;
use session_store::{CombinedStore, FsWorkerStore};
type TestStore = CombinedStore<FsStore, FsWorkerStore>; type TestStore = CombinedStore<FsStore, FsWorkerStore>;
use tokio::sync::broadcast; use tokio::sync::broadcast;

View File

@ -9,7 +9,7 @@ use llm_engine::llm_client::event::{ErrorEvent, Event as LlmEvent, ResponseStatu
use llm_engine::llm_client::types::Item; use llm_engine::llm_client::types::Item;
use llm_engine::llm_client::{ClientError, LlmClient, Request}; use llm_engine::llm_client::{ClientError, LlmClient, Request};
use llm_engine::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use llm_engine::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use pod_store::{CombinedStore, FsWorkerStore}; use session_store::{CombinedStore, FsWorkerStore};
use session_store::{FsStore, LogEntry}; use session_store::{FsStore, LogEntry};
use worker::{Event, Method, Worker, WorkerController, WorkerHandle, WorkerManifest, WorkerStatus}; use worker::{Event, Method, Worker, WorkerController, WorkerHandle, WorkerManifest, WorkerStatus};

View File

@ -2,12 +2,12 @@
//! validation paths. //! validation paths.
//! //!
//! These cases all return before `prepare_worker_common` runs, so they //! These cases all return before `prepare_worker_common` runs, so they
//! do not need a real LLM client or pod-registry environment — only the //! do not need a real LLM client or worker-allocation environment — only the
//! session store needs to be present. //! session store needs to be present.
use std::sync::{LazyLock, Mutex}; use std::sync::{LazyLock, Mutex};
use pod_store::{ use session_store::{
CombinedStore, FsWorkerStore, WorkerActiveSegmentRef, WorkerMetadata, WorkerMetadataStore, CombinedStore, FsWorkerStore, WorkerActiveSegmentRef, WorkerMetadata, WorkerMetadataStore,
}; };
use session_store::{FsStore, StoreError}; use session_store::{FsStore, StoreError};

View File

@ -25,8 +25,8 @@ use llm_engine::Engine;
use llm_engine::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent, UsageEvent}; use llm_engine::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent, UsageEvent};
use llm_engine::llm_client::{ClientError, LlmClient, Request}; use llm_engine::llm_client::{ClientError, LlmClient, Request};
use llm_engine::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use llm_engine::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use pod_store::{CombinedStore, FsWorkerStore};
use session_metrics::{DOMAIN, Metric, metrics_from_extensions}; use session_metrics::{DOMAIN, Metric, metrics_from_extensions};
use session_store::{CombinedStore, FsWorkerStore};
use session_store::{FsStore, LogEntry, SegmentId, SessionId, Store, StoreError, TraceEntry}; use session_store::{FsStore, LogEntry, SegmentId, SessionId, Store, StoreError, TraceEntry};
use worker::{Worker, WorkerManifest}; use worker::{Worker, WorkerManifest};

View File

@ -1,6 +1,6 @@
//! Integration tests for the `SpawnWorker` tool. //! Integration tests for the `SpawnWorker` tool.
//! //!
//! These tests exercise the tool's pod-registry delegation, subprocess //! These tests exercise the tool's worker-allocation delegation, subprocess
//! launch, socket handoff, and `spawned_workers.json` write through an injected //! launch, socket handoff, and `spawned_workers.json` write through an injected
//! typed runtime command. The mock command exits immediately while a //! typed runtime command. The mock command exits immediately while a
//! test-owned Unix listener pre-binds the predicted socket path, so the tool //! test-owned Unix listener pre-binds the predicted socket path, so the tool
@ -22,7 +22,7 @@ use std::sync::Arc;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::net::UnixListener; use tokio::net::UnixListener;
use worker::runtime::dir::{RuntimeDir, SpawnedWorkerRecord}; use worker::runtime::dir::{RuntimeDir, SpawnedWorkerRecord};
use worker::runtime::pod_registry::{self, LockFileGuard}; use worker::runtime::worker_allocation::{self, LockFileGuard};
use worker::spawn::registry::SpawnedWorkerRegistry; use worker::spawn::registry::SpawnedWorkerRegistry;
use worker::spawn::tool::spawn_worker_tool_with_runtime_command; use worker::spawn::tool::spawn_worker_tool_with_runtime_command;
@ -67,7 +67,7 @@ async fn setup_spawner(
.unwrap(); .unwrap();
let spawner_socket = spawner_rd.socket_path(); let spawner_socket = spawner_rd.socket_path();
let _guard = pod_registry::install_top_level( let _guard = worker_allocation::install_top_level(
spawner_name.into(), spawner_name.into(),
std::process::id(), std::process::id(),
spawner_socket.clone(), spawner_socket.clone(),
@ -450,8 +450,8 @@ async fn spawn_worker_delegates_scope_and_sends_run() {
other => panic!("expected Run, got {other:?}"), other => panic!("expected Run, got {other:?}"),
} }
// Verify pod_registry has the child allocation under `root`. // Verify worker_allocation has the child allocation under `root`.
let lock_path = pod_registry::default_registry_path().unwrap(); let lock_path = worker_allocation::default_allocation_path().unwrap();
let guard = LockFileGuard::open(&lock_path).unwrap(); let guard = LockFileGuard::open(&lock_path).unwrap();
let child = guard let child = guard
.data() .data()
@ -651,7 +651,7 @@ async fn spawn_worker_rejects_scope_outside_spawner() {
} }
// The spawner's allocation is unchanged; no "child" appeared. // The spawner's allocation is unchanged; no "child" appeared.
let lock_path = pod_registry::default_registry_path().unwrap(); let lock_path = worker_allocation::default_allocation_path().unwrap();
let guard = LockFileGuard::open(&lock_path).unwrap(); let guard = LockFileGuard::open(&lock_path).unwrap();
assert!(guard.data().find("child").is_none()); assert!(guard.data().find("child").is_none());
@ -724,7 +724,7 @@ async fn spawn_worker_rolls_back_reservation_when_socket_never_appears() {
} }
// Rollback assertion: the reserved "ghost" allocation is gone. // Rollback assertion: the reserved "ghost" allocation is gone.
let lock_path = pod_registry::default_registry_path().unwrap(); let lock_path = worker_allocation::default_allocation_path().unwrap();
let guard = LockFileGuard::open(&lock_path).unwrap(); let guard = LockFileGuard::open(&lock_path).unwrap();
assert!( assert!(
guard.data().find("ghost").is_none(), guard.data().find("ghost").is_none(),

View File

@ -8,7 +8,7 @@ use futures::Stream;
use llm_engine::Engine; use llm_engine::Engine;
use llm_engine::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; use llm_engine::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent};
use llm_engine::llm_client::{ClientError, LlmClient, Request}; use llm_engine::llm_client::{ClientError, LlmClient, Request};
use pod_store::{CombinedStore, FsWorkerStore}; use session_store::{CombinedStore, FsWorkerStore};
use session_store::{FsStore, LogEntry, Store}; use session_store::{FsStore, LogEntry, Store};
use worker::{PromptLoader, SystemPromptTemplate, Worker, WorkerError}; use worker::{PromptLoader, SystemPromptTemplate, Worker, WorkerError};

View File

@ -14,17 +14,17 @@ use std::sync::{Arc, LazyLock, Mutex};
use llm_engine::llm_client::types::{ContentPart, Item, Role}; use llm_engine::llm_client::types::{ContentPart, Item, Role};
use llm_engine::tool::ToolOutput; use llm_engine::tool::ToolOutput;
use manifest::{Permission, Scope, ScopeRule, SharedScope}; use manifest::{Permission, Scope, ScopeRule, SharedScope};
use pod_store::{CombinedStore, FsWorkerStore, WorkerMetadataStore};
use protocol::stream::{JsonLineReader, JsonLineWriter}; use protocol::stream::{JsonLineReader, JsonLineWriter};
use protocol::{ErrorCode, Event, Greeting, Method}; use protocol::{ErrorCode, Event, Greeting, Method};
use serde_json::json; use serde_json::json;
use session_store::FsStore; use session_store::FsStore;
use session_store::{CombinedStore, FsWorkerStore, WorkerMetadataStore};
use tempfile::TempDir; use tempfile::TempDir;
use tokio::net::UnixListener; use tokio::net::UnixListener;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use worker::runtime::dir::{RuntimeDir, SpawnedWorkerRecord}; use worker::runtime::dir::{RuntimeDir, SpawnedWorkerRecord};
use worker::runtime::pod_registry::{self, LockFileGuard}; use worker::runtime::worker_allocation::{self, LockFileGuard};
use worker::spawn::comm_tools::{read_worker_output_tool, send_to_worker_tool, stop_worker_tool}; use worker::spawn::comm_tools::{read_worker_output_tool, send_to_worker_tool, stop_worker_tool};
use worker::spawn::registry::SpawnedWorkerRegistry; use worker::spawn::registry::SpawnedWorkerRegistry;
@ -416,7 +416,7 @@ async fn stop_worker_sends_shutdown_and_releases_scope() {
permission: Permission::Write, permission: Permission::Write,
recursive: true, recursive: true,
}; };
pod_registry::register_worker_with_deny( worker_allocation::register_worker_with_deny(
&mut g, &mut g,
"spawner".into(), "spawner".into(),
std::process::id(), std::process::id(),
@ -426,7 +426,7 @@ async fn stop_worker_sends_shutdown_and_releases_scope() {
session_store::new_segment_id(), session_store::new_segment_id(),
) )
.unwrap(); .unwrap();
pod_registry::register_worker( worker_allocation::register_worker(
&mut g, &mut g,
"child".into(), "child".into(),
std::process::id(), std::process::id(),
@ -663,7 +663,7 @@ async fn load_from_worker_state_reclaims_missing_child_scope_and_records_history
{ {
let mut g = LockFileGuard::open(&runtime_tmp.path().join("workers.json")).unwrap(); let mut g = LockFileGuard::open(&runtime_tmp.path().join("workers.json")).unwrap();
pod_registry::register_worker_with_deny( worker_allocation::register_worker_with_deny(
&mut g, &mut g,
"spawner".into(), "spawner".into(),
std::process::id(), std::process::id(),

View File

@ -15,7 +15,7 @@ use tempfile::TempDir;
use tokio::net::UnixListener; use tokio::net::UnixListener;
use worker::ipc::event::{apply_event_side_effects, fire_and_forget, render_event}; use worker::ipc::event::{apply_event_side_effects, fire_and_forget, render_event};
use worker::runtime::dir::{RuntimeDir, SpawnedWorkerRecord}; use worker::runtime::dir::{RuntimeDir, SpawnedWorkerRecord};
use worker::runtime::pod_registry::{self, LockFileGuard}; use worker::runtime::worker_allocation::{self, LockFileGuard};
use worker::spawn::registry::SpawnedWorkerRegistry; use worker::spawn::registry::SpawnedWorkerRegistry;
/// Serialises tests that mutate `YOI_RUNTIME_DIR`. /// Serialises tests that mutate `YOI_RUNTIME_DIR`.
@ -62,7 +62,7 @@ impl Drop for EnvGuard {
} }
} }
/// Point `YOI_RUNTIME_DIR` at `dir`. The pod-registry then lives at /// Point `YOI_RUNTIME_DIR` at `dir`. The worker-allocation then lives at
/// `<dir>/workers.json` and Worker runtime sub-dirs at `<dir>/{worker_name}/`. /// `<dir>/workers.json` and Worker runtime sub-dirs at `<dir>/{worker_name}/`.
fn set_runtime_dir(dir: &std::path::Path) { fn set_runtime_dir(dir: &std::path::Path) {
unsafe { unsafe {
@ -380,7 +380,7 @@ async fn shutdown_releases_scope_allocation_when_present() {
// Install a top-level allocation for "kid" so ShutDown has // Install a top-level allocation for "kid" so ShutDown has
// something to release. // something to release.
let guard = pod_registry::install_top_level( let guard = worker_allocation::install_top_level(
"kid".into(), "kid".into(),
std::process::id(), std::process::id(),
"/tmp/kid.sock".into(), "/tmp/kid.sock".into(),
@ -412,7 +412,7 @@ async fn shutdown_releases_scope_allocation_when_present() {
) )
.await; .await;
// Allocation is gone from the pod-registry. // Allocation is gone from the worker-allocation.
let g = LockFileGuard::open(&lock_path).unwrap(); let g = LockFileGuard::open(&lock_path).unwrap();
assert!( assert!(
g.data().find("kid").is_none(), g.data().find("kid").is_none(),

View File

@ -15,7 +15,6 @@ client = { workspace = true }
memory = { workspace = true } memory = { workspace = true }
manifest = { workspace = true } manifest = { workspace = true }
worker = { workspace = true } worker = { workspace = true }
pod-store = { workspace = true }
session-store = { workspace = true } session-store = { workspace = true }
session-analytics = { workspace = true } session-analytics = { workspace = true }
ticket = { workspace = true } ticket = { workspace = true }

View File

@ -4,8 +4,8 @@ use std::path::PathBuf;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use manifest::paths; use manifest::paths;
use pod_store::{FsWorkerStore, WorkerMetadataStore};
use session_store::{FsStore, SessionId, Store}; use session_store::{FsStore, SessionId, Store};
use session_store::{FsWorkerStore, WorkerMetadataStore};
use crate::worker_cleanup_cli::parse_duration; use crate::worker_cleanup_cli::parse_duration;
@ -203,8 +203,8 @@ pub fn run_prune_with_roots(
)); ));
} }
let session_store = FsStore::new(data_dir.join("sessions")).map_err(to_error)?; let session_store = FsStore::new(data_dir.join("sessions")).map_err(to_error)?;
let pod_store = FsWorkerStore::new(data_dir.join("pods")).map_err(to_error)?; let worker_metadata_store = FsWorkerStore::new(data_dir.join("workers")).map_err(to_error)?;
let referenced_sessions = referenced_sessions(&pod_store)?; let referenced_sessions = referenced_sessions(&worker_metadata_store)?;
let cutoff = options let cutoff = options
.older_than .older_than
.map(|older_than| { .map(|older_than| {
@ -315,10 +315,12 @@ pub fn run_prune_with_roots(
}) })
} }
fn referenced_sessions(pod_store: &FsWorkerStore) -> Result<BTreeSet<SessionId>, SessionCliError> { fn referenced_sessions(
worker_metadata_store: &FsWorkerStore,
) -> Result<BTreeSet<SessionId>, SessionCliError> {
let mut sessions = BTreeSet::new(); let mut sessions = BTreeSet::new();
for name in pod_store.list_names().map_err(to_error)? { for name in worker_metadata_store.list_names().map_err(to_error)? {
let metadata = pod_store let metadata = worker_metadata_store
.read_by_name(&name) .read_by_name(&name)
.map_err(to_error)? .map_err(to_error)?
.ok_or_else(|| { .ok_or_else(|| {
@ -358,8 +360,8 @@ pub fn help_text() -> &'static str {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use pod_store::{WorkerActiveSegmentRef, WorkerMetadata};
use session_store::{Store, new_segment_id, new_session_id}; use session_store::{Store, new_segment_id, new_session_id};
use session_store::{WorkerActiveSegmentRef, WorkerMetadata};
use std::io::Write; use std::io::Write;
#[test] #[test]
@ -441,7 +443,7 @@ mod tests {
let tmp = tempfile::TempDir::new().unwrap(); let tmp = tempfile::TempDir::new().unwrap();
let data_dir = tmp.path().join("data"); let data_dir = tmp.path().join("data");
let session_store = FsStore::new(data_dir.join("sessions")).unwrap(); let session_store = FsStore::new(data_dir.join("sessions")).unwrap();
let pod_store = FsWorkerStore::new(data_dir.join("pods")).unwrap(); let worker_metadata_store = FsWorkerStore::new(data_dir.join("workers")).unwrap();
let referenced_session = new_session_id(); let referenced_session = new_session_id();
let referenced_segment = new_segment_id(); let referenced_segment = new_segment_id();
let orphan_session = new_session_id(); let orphan_session = new_session_id();
@ -452,7 +454,7 @@ mod tests {
session_store session_store
.create_segment(orphan_session, orphan_segment, &[]) .create_segment(orphan_session, orphan_segment, &[])
.unwrap(); .unwrap();
pod_store worker_metadata_store
.write(&WorkerMetadata::new( .write(&WorkerMetadata::new(
"agent", "agent",
Some(WorkerActiveSegmentRef::active_segment( Some(WorkerActiveSegmentRef::active_segment(

View File

@ -4,7 +4,7 @@ use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use manifest::paths; use manifest::paths;
use pod_store::{FsWorkerStore, WorkerMetadata, WorkerMetadataStore, validate_worker_name}; use session_store::{FsWorkerStore, WorkerMetadata, WorkerMetadataStore, validate_worker_name};
const MAX_REPORT_ITEMS: usize = 50; const MAX_REPORT_ITEMS: usize = 50;
@ -234,12 +234,12 @@ async fn run_delete(
data_dir: PathBuf, data_dir: PathBuf,
runtime_dir: PathBuf, runtime_dir: PathBuf,
) -> Result<WorkerCleanupCliOutput, WorkerCleanupCliError> { ) -> Result<WorkerCleanupCliOutput, WorkerCleanupCliError> {
let store = FsWorkerStore::new(data_dir.join("pods")).map_err(to_error)?; let store = FsWorkerStore::new(data_dir.join("workers")).map_err(to_error)?;
let metadata = store.read_by_name(&options.name).map_err(to_error)?; let metadata = store.read_by_name(&options.name).map_err(to_error)?;
let Some(metadata) = metadata else { let Some(metadata) = metadata else {
return Ok(WorkerCleanupCliOutput { return Ok(WorkerCleanupCliOutput {
stdout: format!( stdout: format!(
"yoi worker delete\nstatus: refused\npod: {}\nreason: worker metadata is missing\n", "yoi worker delete\nstatus: refused\nworker: {}\nreason: worker metadata is missing\n",
options.name options.name
), ),
status: WorkerCleanupCliStatus::Failure, status: WorkerCleanupCliStatus::Failure,
@ -250,7 +250,7 @@ async fn run_delete(
if let Some(reason) = probe.refusal_reason() { if let Some(reason) = probe.refusal_reason() {
return Ok(WorkerCleanupCliOutput { return Ok(WorkerCleanupCliOutput {
stdout: format!( stdout: format!(
"yoi worker delete\nstatus: refused\npod: {}\nreason: {}\nsocket: {}\n", "yoi worker delete\nstatus: refused\nworker: {}\nreason: {}\nsocket: {}\n",
options.name, options.name,
reason, reason,
probe.socket_path.display() probe.socket_path.display()
@ -290,7 +290,7 @@ async fn run_prune(
data_dir: PathBuf, data_dir: PathBuf,
runtime_dir: PathBuf, runtime_dir: PathBuf,
) -> Result<WorkerCleanupCliOutput, WorkerCleanupCliError> { ) -> Result<WorkerCleanupCliOutput, WorkerCleanupCliError> {
let store = FsWorkerStore::new(data_dir.join("pods")).map_err(to_error)?; let store = FsWorkerStore::new(data_dir.join("workers")).map_err(to_error)?;
let names = store.list_names().map_err(to_error)?; let names = store.list_names().map_err(to_error)?;
let cutoff = SystemTime::now() let cutoff = SystemTime::now()
.checked_sub(options.older_than) .checked_sub(options.older_than)
@ -498,7 +498,7 @@ fn prune_help_text() -> &'static str {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use pod_store::WorkerActiveSegmentRef; use session_store::WorkerActiveSegmentRef;
use session_store::{Store, new_segment_id, new_session_id}; use session_store::{Store, new_segment_id, new_session_id};
fn string_args(args: &[&str]) -> Vec<String> { fn string_args(args: &[&str]) -> Vec<String> {
@ -543,14 +543,14 @@ mod tests {
let tmp = tempfile::TempDir::new().unwrap(); let tmp = tempfile::TempDir::new().unwrap();
let data_dir = tmp.path().join("data"); let data_dir = tmp.path().join("data");
let runtime_dir = tmp.path().join("run"); let runtime_dir = tmp.path().join("run");
let pod_store = FsWorkerStore::new(data_dir.join("pods")).unwrap(); let worker_metadata_store = FsWorkerStore::new(data_dir.join("workers")).unwrap();
let session_store = session_store::FsStore::new(data_dir.join("sessions")).unwrap(); let session_store = session_store::FsStore::new(data_dir.join("sessions")).unwrap();
let session_id = new_session_id(); let session_id = new_session_id();
let segment_id = new_segment_id(); let segment_id = new_segment_id();
session_store session_store
.create_segment(session_id, segment_id, &[]) .create_segment(session_id, segment_id, &[])
.unwrap(); .unwrap();
pod_store worker_metadata_store
.write(&WorkerMetadata::new( .write(&WorkerMetadata::new(
"agent", "agent",
Some(WorkerActiveSegmentRef::active_segment( Some(WorkerActiveSegmentRef::active_segment(
@ -573,7 +573,12 @@ mod tests {
assert_eq!(output.status, WorkerCleanupCliStatus::Success); assert_eq!(output.status, WorkerCleanupCliStatus::Success);
assert!(output.stdout.contains("deleted: worker metadata")); assert!(output.stdout.contains("deleted: worker metadata"));
assert!(pod_store.read_by_name("agent").unwrap().is_none()); assert!(
worker_metadata_store
.read_by_name("agent")
.unwrap()
.is_none()
);
assert!(session_store.exists(session_id, segment_id).unwrap()); assert!(session_store.exists(session_id, segment_id).unwrap());
} }
@ -582,8 +587,8 @@ mod tests {
let tmp = tempfile::TempDir::new().unwrap(); let tmp = tempfile::TempDir::new().unwrap();
let data_dir = tmp.path().join("data"); let data_dir = tmp.path().join("data");
let runtime_dir = tmp.path().join("run"); let runtime_dir = tmp.path().join("run");
let pod_store = FsWorkerStore::new(data_dir.join("pods")).unwrap(); let worker_metadata_store = FsWorkerStore::new(data_dir.join("workers")).unwrap();
pod_store worker_metadata_store
.write(&WorkerMetadata::new("agent", None)) .write(&WorkerMetadata::new("agent", None))
.unwrap(); .unwrap();
@ -601,7 +606,12 @@ mod tests {
assert_eq!(output.status, WorkerCleanupCliStatus::Success); assert_eq!(output.status, WorkerCleanupCliStatus::Success);
assert!(output.stdout.contains("mode: dry-run")); assert!(output.stdout.contains("mode: dry-run"));
assert!(pod_store.read_by_name("agent").unwrap().is_some()); assert!(
worker_metadata_store
.read_by_name("agent")
.unwrap()
.is_some()
);
} }
#[cfg(unix)] #[cfg(unix)]
@ -612,8 +622,8 @@ mod tests {
let tmp = tempfile::TempDir::new().unwrap(); let tmp = tempfile::TempDir::new().unwrap();
let data_dir = tmp.path().join("data"); let data_dir = tmp.path().join("data");
let runtime_dir = tmp.path().join("run"); let runtime_dir = tmp.path().join("run");
let pod_store = FsWorkerStore::new(data_dir.join("pods")).unwrap(); let worker_metadata_store = FsWorkerStore::new(data_dir.join("workers")).unwrap();
pod_store worker_metadata_store
.write(&WorkerMetadata::new("agent", None)) .write(&WorkerMetadata::new("agent", None))
.unwrap(); .unwrap();
std::fs::create_dir_all(runtime_dir.join("agent")).unwrap(); std::fs::create_dir_all(runtime_dir.join("agent")).unwrap();
@ -634,6 +644,11 @@ mod tests {
drop(listener); drop(listener);
assert_eq!(output.status, WorkerCleanupCliStatus::Failure); assert_eq!(output.status, WorkerCleanupCliStatus::Failure);
assert!(output.stdout.contains("status: refused")); assert!(output.stdout.contains("status: refused"));
assert!(pod_store.read_by_name("agent").unwrap().is_some()); assert!(
worker_metadata_store
.read_by_name("agent")
.unwrap()
.is_some()
);
} }
} }

View File

@ -43,7 +43,7 @@ rustPlatform.buildRustPackage rec {
filter = sourceFilter; filter = sourceFilter;
}; };
cargoHash = "sha256-9F60cIVhRTct8sK11xoqOVA4rLd5Ba76Vi7+Y2NFrRo="; cargoHash = "sha256-9e99NfbErWlmyZqXd7d5UaJ88gx6ENbHOubqYtnjXVg=";
depsExtraArgs = { depsExtraArgs = {
# Older fetchCargoVendor utilities used crates.io's API download endpoint, # Older fetchCargoVendor utilities used crates.io's API download endpoint,

View File

@ -1350,8 +1350,9 @@ impl FixtureWorkspace {
}; };
fixture.write_fixture_metadata("created", None)?; fixture.write_fixture_metadata("created", None)?;
write_blocking_pod_metadata(&fixture.xdg_data_home, "workspace")?; let worker_metadata_root = active_worker_metadata_root(&fixture.home);
write_blocking_pod_metadata(&fixture.xdg_data_home, "workspace-orchestrator")?; write_blocking_worker_metadata(&worker_metadata_root, "workspace")?;
write_blocking_worker_metadata(&worker_metadata_root, "workspace-orchestrator")?;
run_yoi( run_yoi(
binary, binary,
&fixture.workspace, &fixture.workspace,
@ -1621,8 +1622,8 @@ impl FixtureWorkspace {
"host_runtime_inherited": false, "host_runtime_inherited": false,
"host_xdg_runtime_dir_present": std::env::var_os("XDG_RUNTIME_DIR").is_some(), "host_xdg_runtime_dir_present": std::env::var_os("XDG_RUNTIME_DIR").is_some(),
"tested_yoi_runtime_source": "fixture XDG_RUNTIME_DIR", "tested_yoi_runtime_source": "fixture XDG_RUNTIME_DIR",
"tested_yoi_pod_registry": self.xdg_runtime_dir.join("yoi").join("pods.json"), "tested_yoi_worker_allocation": self.xdg_runtime_dir.join("yoi").join("workers.json"),
"fixture_pod_metadata_root": self.xdg_data_home.join("yoi").join("pods") "fixture_worker_metadata_root": active_worker_metadata_root(&self.home)
}, },
"tested_yoi_env_policy": tested_yoi_env_policy_overview(), "tested_yoi_env_policy": tested_yoi_env_policy_overview(),
"cleanup": cleanup, "cleanup": cleanup,
@ -1997,8 +1998,12 @@ fn copy_dir_recursive(source: &Path, destination: &Path) -> Result<()> {
Ok(()) Ok(())
} }
fn write_blocking_pod_metadata(data_home: &Path, worker_name: &str) -> Result<()> { fn active_worker_metadata_root(home: &Path) -> PathBuf {
let dir = data_home.join("yoi").join("pods").join(worker_name); home.join(".yoi").join("workers")
}
fn write_blocking_worker_metadata(worker_metadata_root: &Path, worker_name: &str) -> Result<()> {
let dir = worker_metadata_root.join(worker_name);
fs::create_dir_all(&dir)?; fs::create_dir_all(&dir)?;
fs::write(dir.join("metadata.json"), b"not valid metadata for e2e\n")?; fs::write(dir.join("metadata.json"), b"not valid metadata for e2e\n")?;
Ok(()) Ok(())