diff --git a/Cargo.lock b/Cargo.lock index 35c287d4..f2f468eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5906,6 +5906,7 @@ name = "worker-runtime" version = "0.1.0" dependencies = [ "serde", + "serde_json", "thiserror 2.0.18", ] diff --git a/crates/worker-runtime/Cargo.toml b/crates/worker-runtime/Cargo.toml index e9b53282..aca3ecb1 100644 --- a/crates/worker-runtime/Cargo.toml +++ b/crates/worker-runtime/Cargo.toml @@ -5,6 +5,11 @@ version = "0.1.0" edition.workspace = true license.workspace = true +[features] +default = [] +fs-store = ["dep:serde_json"] + [dependencies] serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true, optional = true } thiserror = { workspace = true } diff --git a/crates/worker-runtime/src/error.rs b/crates/worker-runtime/src/error.rs index 41c2e7ec..76b1c6b4 100644 --- a/crates/worker-runtime/src/error.rs +++ b/crates/worker-runtime/src/error.rs @@ -1,4 +1,5 @@ use crate::identity::{RuntimeId, WorkerId}; +use std::path::PathBuf; /// Errors returned by the embedded Runtime API. #[derive(Debug, thiserror::Error)] @@ -33,6 +34,27 @@ pub enum RuntimeError { #[error("invalid request: {0}")] InvalidRequest(String), + #[error("runtime store {operation} failed at {}: {source}", path.display())] + StoreIo { + operation: &'static str, + path: PathBuf, + #[source] + source: std::io::Error, + }, + + #[error("runtime store {operation} missing data at {}", path.display())] + StoreMissing { + operation: &'static str, + path: PathBuf, + }, + + #[error("runtime store {operation} found corrupt data at {}: {message}", path.display())] + StoreCorrupt { + operation: &'static str, + path: PathBuf, + message: String, + }, + #[error("runtime state lock was poisoned")] StatePoisoned, } diff --git a/crates/worker-runtime/src/fs_store.rs b/crates/worker-runtime/src/fs_store.rs new file mode 100644 index 00000000..54f1c41c --- /dev/null +++ b/crates/worker-runtime/src/fs_store.rs @@ -0,0 +1,755 @@ +use crate::catalog::{CreateWorkerRequest, WorkerStatus}; +use crate::diagnostics::RuntimeDiagnostic; +use crate::error::RuntimeError; +use crate::identity::{RuntimeId, WorkerId, WorkerRef}; +use crate::management::{RuntimeBackendKind, RuntimeLimits, RuntimeStatus}; +use crate::observation::{ + EventCursor, RuntimeEvent, RuntimeEventBatch, TranscriptEntry, TranscriptProjection, + TranscriptQuery, +}; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::fs::{self, File, OpenOptions}; +use std::io::{BufRead, BufReader, Write}; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; + +const SCHEMA_VERSION: u32 = 1; +const RUNTIMES_DIR: &str = "runtimes"; +const RUNTIME_FILE: &str = "runtime.json"; +const EVENTS_FILE: &str = "events.jsonl"; +const WORKERS_DIR: &str = "workers"; +const WORKER_FILE: &str = "worker.json"; +const TRANSCRIPT_FILE: &str = "transcript.jsonl"; + +static NEXT_TMP_SEQUENCE: AtomicU64 = AtomicU64::new(1); + +/// Options for constructing a filesystem-backed Runtime store. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct FsRuntimeStoreOptions { + /// Root directory containing all Runtime-scoped store data. + pub root: PathBuf, + pub runtime_id: Option, + pub display_name: Option, + pub limits: RuntimeLimits, +} + +impl FsRuntimeStoreOptions { + pub fn new(root: impl Into) -> Self { + Self { + root: root.into(), + runtime_id: None, + display_name: None, + limits: RuntimeLimits::default(), + } + } +} + +/// Filesystem persistence boundary for Worker Runtime state. +/// +/// Authority is the typed `runtime_id + worker_id` pair. Those ids are encoded +/// into path components only after validation; legacy pod paths, socket paths, +/// and session paths are deliberately not part of the layout or lookup API. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct FsRuntimeStore { + root: PathBuf, + runtime_id: RuntimeId, + runtime_dir: PathBuf, +} + +impl FsRuntimeStore { + pub fn root(&self) -> &Path { + &self.root + } + + pub fn runtime_id(&self) -> &RuntimeId { + &self.runtime_id + } + + pub fn runtime_dir(&self) -> &Path { + &self.runtime_dir + } + + /// Read persisted Runtime events directly from the event log with the same + /// bounded cursor semantics as [`crate::Runtime::read_events`]. + pub fn read_events( + &self, + cursor: &EventCursor, + limit: usize, + max_limit: usize, + ) -> Result { + if cursor.runtime_id != self.runtime_id { + return Err(RuntimeError::WrongRuntimeCursor { + expected_runtime_id: self.runtime_id.clone(), + actual_runtime_id: cursor.runtime_id.clone(), + }); + } + if limit > max_limit { + return Err(RuntimeError::LimitTooLarge { + requested: limit, + max: max_limit, + }); + } + + let events = read_json_lines::(&self.events_path(), "read events")?; + let mut selected = Vec::new(); + for event in events + .iter() + .filter(|event| event.id >= cursor.next_event_id) + .take(limit) + { + selected.push(event.clone()); + } + let next_event_id = selected + .last() + .map(|event| event.id + 1) + .unwrap_or(cursor.next_event_id); + let has_more = events.iter().any(|event| event.id >= next_event_id); + + Ok(RuntimeEventBatch { + runtime_id: self.runtime_id.clone(), + cursor: EventCursor { + runtime_id: self.runtime_id.clone(), + next_event_id, + }, + events: selected, + has_more, + }) + } + + /// Read a persisted Worker transcript directly from its Worker-scoped log. + pub fn read_transcript( + &self, + worker_ref: &WorkerRef, + query: TranscriptQuery, + max_limit: usize, + ) -> Result { + self.ensure_worker_ref(worker_ref)?; + if query.limit > max_limit { + return Err(RuntimeError::LimitTooLarge { + requested: query.limit, + max: max_limit, + }); + } + + let path = self.transcript_path(&worker_ref.worker_id); + let entries = read_json_lines::(&path, "read transcript")?; + let total_items = entries.len(); + let end = query.start.saturating_add(query.limit).min(total_items); + let items = if query.start >= total_items { + Vec::new() + } else { + entries[query.start..end].to_vec() + }; + let next_start = (end < total_items).then_some(end); + + Ok(TranscriptProjection { + worker_ref: worker_ref.clone(), + start: query.start, + limit: query.limit, + total_items, + items, + next_start, + }) + } + + pub(crate) fn open_or_create( + root: PathBuf, + runtime_id: RuntimeId, + ) -> Result { + fs::create_dir_all(root.join(RUNTIMES_DIR)).map_err(|source| RuntimeError::StoreIo { + operation: "create store root", + path: root.join(RUNTIMES_DIR), + source, + })?; + + let runtime_dir = runtime_dir(&root, &runtime_id); + let existed = runtime_dir.exists(); + if existed && !runtime_dir.is_dir() { + return Err(RuntimeError::StoreCorrupt { + operation: "open runtime store", + path: runtime_dir, + message: "runtime path exists but is not a directory".to_string(), + }); + } + + fs::create_dir_all(runtime_dir.join(WORKERS_DIR)).map_err(|source| { + RuntimeError::StoreIo { + operation: "create runtime store", + path: runtime_dir.join(WORKERS_DIR), + source, + } + })?; + + let store = Self { + root, + runtime_id, + runtime_dir, + }; + let state = if existed { + Some(store.load_runtime_state()?) + } else { + None + }; + Ok(OpenedFsRuntimeStore { store, state }) + } + + pub(crate) fn write_runtime_snapshot( + &self, + state: &PersistedRuntimeState, + ) -> Result<(), RuntimeError> { + let snapshot = RuntimeSnapshot::from_persisted(state); + atomic_write_json(&self.runtime_path(), &snapshot, "write runtime snapshot") + } + + pub(crate) fn write_worker_snapshot( + &self, + worker: &PersistedWorkerRecord, + ) -> Result<(), RuntimeError> { + self.ensure_worker_ref(&worker.worker_ref)?; + let worker_dir = self.worker_dir(&worker.worker_id); + fs::create_dir_all(&worker_dir).map_err(|source| RuntimeError::StoreIo { + operation: "create worker store", + path: worker_dir.clone(), + source, + })?; + atomic_write_json( + &worker_dir.join(WORKER_FILE), + &WorkerSnapshot::from_persisted(worker), + "write worker snapshot", + )?; + ensure_file_exists(&worker_dir.join(TRANSCRIPT_FILE), "create transcript log") + } + + pub(crate) fn append_event(&self, event: &RuntimeEvent) -> Result<(), RuntimeError> { + if let Some(worker_ref) = &event.worker_ref { + self.ensure_worker_ref(worker_ref)?; + } + append_json_line(&self.events_path(), event, "append event") + } + + pub(crate) fn append_transcript_entry( + &self, + entry: &TranscriptEntry, + ) -> Result<(), RuntimeError> { + self.ensure_worker_ref(&entry.worker_ref)?; + append_json_line( + &self.transcript_path(&entry.worker_ref.worker_id), + entry, + "append transcript", + ) + } + + pub(crate) fn load_runtime_state(&self) -> Result { + let runtime_path = self.runtime_path(); + let snapshot: RuntimeSnapshot = read_json(&runtime_path, "read runtime snapshot")?; + snapshot.validate(&self.runtime_id, &runtime_path)?; + + let events = read_json_lines::(&self.events_path(), "read events")?; + let workers_dir = self.runtime_dir.join(WORKERS_DIR); + if !workers_dir.exists() { + return Err(RuntimeError::StoreMissing { + operation: "read workers", + path: workers_dir, + }); + } + if !workers_dir.is_dir() { + return Err(RuntimeError::StoreCorrupt { + operation: "read workers", + path: workers_dir, + message: "workers path exists but is not a directory".to_string(), + }); + } + + let mut workers = BTreeMap::new(); + let mut worker_dirs = fs::read_dir(&workers_dir) + .map_err(|source| RuntimeError::StoreIo { + operation: "read workers", + path: workers_dir.clone(), + source, + })? + .collect::, _>>() + .map_err(|source| RuntimeError::StoreIo { + operation: "read workers", + path: workers_dir.clone(), + source, + })?; + worker_dirs.sort_by_key(|entry| entry.path()); + + for entry in worker_dirs { + let path = entry.path(); + if !path.is_dir() { + return Err(RuntimeError::StoreCorrupt { + operation: "read worker", + path, + message: "worker path exists but is not a directory".to_string(), + }); + } + let worker_snapshot_path = path.join(WORKER_FILE); + let worker_snapshot: WorkerSnapshot = + read_json(&worker_snapshot_path, "read worker snapshot")?; + worker_snapshot.validate(&self.runtime_id, &worker_snapshot_path)?; + let transcript = + read_json_lines::(&path.join(TRANSCRIPT_FILE), "read transcript")?; + for entry in &transcript { + self.ensure_worker_ref(&entry.worker_ref)?; + if entry.worker_ref.worker_id != worker_snapshot.worker_id { + return Err(RuntimeError::StoreCorrupt { + operation: "read transcript", + path: path.join(TRANSCRIPT_FILE), + message: format!( + "transcript entry belongs to worker {}, expected {}", + entry.worker_ref.worker_id, worker_snapshot.worker_id + ), + }); + } + } + let worker = worker_snapshot.into_persisted(transcript); + if workers.insert(worker.worker_id.clone(), worker).is_some() { + return Err(RuntimeError::StoreCorrupt { + operation: "read workers", + path: workers_dir.clone(), + message: "duplicate worker id in store".to_string(), + }); + } + } + + Ok(snapshot.into_persisted(events, workers)) + } + + fn ensure_worker_ref(&self, worker_ref: &WorkerRef) -> Result<(), RuntimeError> { + if worker_ref.runtime_id == self.runtime_id { + Ok(()) + } else { + Err(RuntimeError::WrongRuntime { + expected_runtime_id: self.runtime_id.clone(), + actual_runtime_id: worker_ref.runtime_id.clone(), + worker_id: worker_ref.worker_id.clone(), + }) + } + } + + fn runtime_path(&self) -> PathBuf { + self.runtime_dir.join(RUNTIME_FILE) + } + + fn events_path(&self) -> PathBuf { + self.runtime_dir.join(EVENTS_FILE) + } + + fn worker_dir(&self, worker_id: &WorkerId) -> PathBuf { + self.runtime_dir + .join(WORKERS_DIR) + .join(encoded_component(worker_id.as_str())) + } + + fn transcript_path(&self, worker_id: &WorkerId) -> PathBuf { + self.worker_dir(worker_id).join(TRANSCRIPT_FILE) + } +} + +#[derive(Debug)] +pub(crate) struct OpenedFsRuntimeStore { + pub(crate) store: FsRuntimeStore, + pub(crate) state: Option, +} + +#[derive(Clone, Debug)] +pub(crate) struct PersistedRuntimeState { + pub(crate) runtime_id: RuntimeId, + pub(crate) display_name: Option, + pub(crate) status: RuntimeStatus, + pub(crate) limits: RuntimeLimits, + pub(crate) next_worker_sequence: u64, + pub(crate) next_event_id: u64, + pub(crate) next_diagnostic_id: u64, + pub(crate) workers: BTreeMap, + pub(crate) events: Vec, + pub(crate) diagnostics: Vec, +} + +#[derive(Clone, Debug)] +pub(crate) struct PersistedWorkerRecord { + pub(crate) worker_ref: WorkerRef, + pub(crate) worker_id: WorkerId, + pub(crate) status: WorkerStatus, + pub(crate) request: CreateWorkerRequest, + pub(crate) transcript: Vec, + pub(crate) next_transcript_sequence: u64, + pub(crate) last_event_id: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +struct RuntimeSnapshot { + schema_version: u32, + runtime_id: RuntimeId, + display_name: Option, + backend: RuntimeBackendKind, + status: RuntimeStatus, + limits: RuntimeLimits, + next_worker_sequence: u64, + next_event_id: u64, + next_diagnostic_id: u64, + diagnostics: Vec, +} + +impl RuntimeSnapshot { + fn from_persisted(state: &PersistedRuntimeState) -> Self { + Self { + schema_version: SCHEMA_VERSION, + runtime_id: state.runtime_id.clone(), + display_name: state.display_name.clone(), + backend: RuntimeBackendKind::FsStore, + status: state.status, + limits: state.limits.clone(), + next_worker_sequence: state.next_worker_sequence, + next_event_id: state.next_event_id, + next_diagnostic_id: state.next_diagnostic_id, + diagnostics: state.diagnostics.clone(), + } + } + + fn validate(&self, expected_runtime_id: &RuntimeId, path: &Path) -> Result<(), RuntimeError> { + if self.schema_version != SCHEMA_VERSION { + return Err(RuntimeError::StoreCorrupt { + operation: "read runtime snapshot", + path: path.to_path_buf(), + message: format!( + "unsupported schema version {}, expected {}", + self.schema_version, SCHEMA_VERSION + ), + }); + } + if &self.runtime_id != expected_runtime_id { + return Err(RuntimeError::StoreCorrupt { + operation: "read runtime snapshot", + path: path.to_path_buf(), + message: format!( + "runtime snapshot id {} does not match requested runtime {}", + self.runtime_id, expected_runtime_id + ), + }); + } + if self.backend != RuntimeBackendKind::FsStore { + return Err(RuntimeError::StoreCorrupt { + operation: "read runtime snapshot", + path: path.to_path_buf(), + message: format!("runtime snapshot backend is {:?}", self.backend), + }); + } + Ok(()) + } + + fn into_persisted( + self, + events: Vec, + workers: BTreeMap, + ) -> PersistedRuntimeState { + PersistedRuntimeState { + runtime_id: self.runtime_id, + display_name: self.display_name, + status: self.status, + limits: self.limits, + next_worker_sequence: self.next_worker_sequence, + next_event_id: self.next_event_id, + next_diagnostic_id: self.next_diagnostic_id, + workers, + events, + diagnostics: self.diagnostics, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +struct WorkerSnapshot { + schema_version: u32, + worker_ref: WorkerRef, + worker_id: WorkerId, + status: WorkerStatus, + request: CreateWorkerRequest, + next_transcript_sequence: u64, + last_event_id: u64, +} + +impl WorkerSnapshot { + fn from_persisted(worker: &PersistedWorkerRecord) -> Self { + Self { + schema_version: SCHEMA_VERSION, + worker_ref: worker.worker_ref.clone(), + worker_id: worker.worker_id.clone(), + status: worker.status, + request: worker.request.clone(), + next_transcript_sequence: worker.next_transcript_sequence, + last_event_id: worker.last_event_id, + } + } + + fn validate(&self, expected_runtime_id: &RuntimeId, path: &Path) -> Result<(), RuntimeError> { + if self.schema_version != SCHEMA_VERSION { + return Err(RuntimeError::StoreCorrupt { + operation: "read worker snapshot", + path: path.to_path_buf(), + message: format!( + "unsupported schema version {}, expected {}", + self.schema_version, SCHEMA_VERSION + ), + }); + } + if self.worker_ref.runtime_id != *expected_runtime_id { + return Err(RuntimeError::StoreCorrupt { + operation: "read worker snapshot", + path: path.to_path_buf(), + message: format!( + "worker belongs to runtime {}, expected {}", + self.worker_ref.runtime_id, expected_runtime_id + ), + }); + } + if self.worker_ref.worker_id != self.worker_id { + return Err(RuntimeError::StoreCorrupt { + operation: "read worker snapshot", + path: path.to_path_buf(), + message: format!( + "worker_ref id {} does not match worker_id {}", + self.worker_ref.worker_id, self.worker_id + ), + }); + } + Ok(()) + } + + fn into_persisted(self, transcript: Vec) -> PersistedWorkerRecord { + PersistedWorkerRecord { + worker_ref: self.worker_ref, + worker_id: self.worker_id, + status: self.status, + request: self.request, + transcript, + next_transcript_sequence: self.next_transcript_sequence, + last_event_id: self.last_event_id, + } + } +} + +fn runtime_dir(root: &Path, runtime_id: &RuntimeId) -> PathBuf { + root.join(RUNTIMES_DIR) + .join(encoded_component(runtime_id.as_str())) +} + +fn encoded_component(value: &str) -> String { + let mut encoded = String::with_capacity(3 + value.len() * 2); + encoded.push_str("id-"); + for byte in value.as_bytes() { + encoded.push(hex_digit(byte >> 4)); + encoded.push(hex_digit(byte & 0x0f)); + } + encoded +} + +fn hex_digit(value: u8) -> char { + match value { + 0..=9 => (b'0' + value) as char, + 10..=15 => (b'a' + (value - 10)) as char, + _ => unreachable!("hex digit nybble is always <= 15"), + } +} + +fn read_json(path: &Path, operation: &'static str) -> Result +where + T: for<'de> Deserialize<'de>, +{ + let file = File::open(path).map_err(|source| match source.kind() { + std::io::ErrorKind::NotFound => RuntimeError::StoreMissing { + operation, + path: path.to_path_buf(), + }, + _ => RuntimeError::StoreIo { + operation, + path: path.to_path_buf(), + source, + }, + })?; + serde_json::from_reader(BufReader::new(file)).map_err(|source| RuntimeError::StoreCorrupt { + operation, + path: path.to_path_buf(), + message: source.to_string(), + }) +} + +fn read_json_lines(path: &Path, operation: &'static str) -> Result, RuntimeError> +where + T: for<'de> Deserialize<'de>, +{ + let file = File::open(path).map_err(|source| match source.kind() { + std::io::ErrorKind::NotFound => RuntimeError::StoreMissing { + operation, + path: path.to_path_buf(), + }, + _ => RuntimeError::StoreIo { + operation, + path: path.to_path_buf(), + source, + }, + })?; + let reader = BufReader::new(file); + let mut items = Vec::new(); + for (index, line) in reader.lines().enumerate() { + let line = line.map_err(|source| RuntimeError::StoreIo { + operation, + path: path.to_path_buf(), + source, + })?; + if line.trim().is_empty() { + continue; + } + let item = serde_json::from_str(&line).map_err(|source| RuntimeError::StoreCorrupt { + operation, + path: path.to_path_buf(), + message: format!("line {}: {source}", index + 1), + })?; + items.push(item); + } + Ok(items) +} + +fn atomic_write_json(path: &Path, value: &T, operation: &'static str) -> Result<(), RuntimeError> +where + T: Serialize, +{ + let parent = path.parent().ok_or_else(|| RuntimeError::StoreCorrupt { + operation, + path: path.to_path_buf(), + message: "path has no parent directory".to_string(), + })?; + fs::create_dir_all(parent).map_err(|source| RuntimeError::StoreIo { + operation, + path: parent.to_path_buf(), + source, + })?; + + let tmp_path = tmp_path_for(path); + let write_result = (|| { + let mut file = OpenOptions::new() + .write(true) + .create_new(true) + .open(&tmp_path) + .map_err(|source| RuntimeError::StoreIo { + operation, + path: tmp_path.clone(), + source, + })?; + serde_json::to_writer_pretty(&mut file, value).map_err(|source| { + RuntimeError::StoreCorrupt { + operation, + path: tmp_path.clone(), + message: format!("serialize json: {source}"), + } + })?; + file.write_all(b"\n") + .map_err(|source| RuntimeError::StoreIo { + operation, + path: tmp_path.clone(), + source, + })?; + file.sync_all().map_err(|source| RuntimeError::StoreIo { + operation, + path: tmp_path.clone(), + source, + })?; + drop(file); + fs::rename(&tmp_path, path).map_err(|source| RuntimeError::StoreIo { + operation, + path: path.to_path_buf(), + source, + })?; + sync_directory(parent, operation) + })(); + + if write_result.is_err() { + let _ = fs::remove_file(&tmp_path); + } + write_result +} + +fn append_json_line(path: &Path, value: &T, operation: &'static str) -> Result<(), RuntimeError> +where + T: Serialize, +{ + let parent = path.parent().ok_or_else(|| RuntimeError::StoreCorrupt { + operation, + path: path.to_path_buf(), + message: "path has no parent directory".to_string(), + })?; + fs::create_dir_all(parent).map_err(|source| RuntimeError::StoreIo { + operation, + path: parent.to_path_buf(), + source, + })?; + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(path) + .map_err(|source| RuntimeError::StoreIo { + operation, + path: path.to_path_buf(), + source, + })?; + serde_json::to_writer(&mut file, value).map_err(|source| RuntimeError::StoreCorrupt { + operation, + path: path.to_path_buf(), + message: format!("serialize json: {source}"), + })?; + file.write_all(b"\n") + .and_then(|()| file.flush()) + .and_then(|()| file.sync_all()) + .map_err(|source| RuntimeError::StoreIo { + operation, + path: path.to_path_buf(), + source, + }) +} + +fn ensure_file_exists(path: &Path, operation: &'static str) -> Result<(), RuntimeError> { + let parent = path.parent().ok_or_else(|| RuntimeError::StoreCorrupt { + operation, + path: path.to_path_buf(), + message: "path has no parent directory".to_string(), + })?; + fs::create_dir_all(parent).map_err(|source| RuntimeError::StoreIo { + operation, + path: parent.to_path_buf(), + source, + })?; + OpenOptions::new() + .create(true) + .append(true) + .open(path) + .and_then(|file| file.sync_all()) + .map_err(|source| RuntimeError::StoreIo { + operation, + path: path.to_path_buf(), + source, + }) +} + +fn tmp_path_for(path: &Path) -> PathBuf { + let sequence = NEXT_TMP_SEQUENCE.fetch_add(1, Ordering::Relaxed); + let file_name = path + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or("store"); + path.with_file_name(format!( + ".{file_name}.tmp-{}-{sequence}", + std::process::id() + )) +} + +fn sync_directory(path: &Path, operation: &'static str) -> Result<(), RuntimeError> { + File::open(path) + .and_then(|file| file.sync_all()) + .map_err(|source| RuntimeError::StoreIo { + operation, + path: path.to_path_buf(), + source, + }) +} diff --git a/crates/worker-runtime/src/lib.rs b/crates/worker-runtime/src/lib.rs index 96bcf3c1..5b3b844f 100644 --- a/crates/worker-runtime/src/lib.rs +++ b/crates/worker-runtime/src/lib.rs @@ -1,17 +1,22 @@ //! Embedded Runtime domain API for Worker management. //! //! `worker-runtime` intentionally stays independent from HTTP/WebSocket servers, -//! filesystem persistence, provider execution, and the existing Worker host. It -//! defines the in-process Runtime authority surface that higher layers can later -//! adapt into registries or web APIs. +//! provider execution, and the existing Worker host. Filesystem persistence is +//! available only through the optional `fs-store` feature. The crate defines the +//! in-process Runtime authority surface that higher layers can later adapt into +//! registries or web APIs. pub mod catalog; pub mod diagnostics; pub mod error; +#[cfg(feature = "fs-store")] +pub mod fs_store; pub mod identity; pub mod interaction; pub mod management; pub mod observation; mod runtime; +#[cfg(feature = "fs-store")] +pub use fs_store::{FsRuntimeStore, FsRuntimeStoreOptions}; pub use runtime::Runtime; diff --git a/crates/worker-runtime/src/management.rs b/crates/worker-runtime/src/management.rs index 5fa2b5da..e9a926f0 100644 --- a/crates/worker-runtime/src/management.rs +++ b/crates/worker-runtime/src/management.rs @@ -1,11 +1,13 @@ use crate::identity::RuntimeId; use serde::{Deserialize, Serialize}; -/// Runtime backend kind. v0 intentionally ships only an embedded memory backend. +/// Runtime backend kind. #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum RuntimeBackendKind { Memory, + #[cfg(feature = "fs-store")] + FsStore, } /// Runtime lifecycle state. diff --git a/crates/worker-runtime/src/runtime.rs b/crates/worker-runtime/src/runtime.rs index 54c72795..75eccdc5 100644 --- a/crates/worker-runtime/src/runtime.rs +++ b/crates/worker-runtime/src/runtime.rs @@ -3,6 +3,10 @@ use crate::catalog::{ }; use crate::diagnostics::{DiagnosticSeverity, RuntimeDiagnostic}; use crate::error::RuntimeError; +#[cfg(feature = "fs-store")] +use crate::fs_store::{ + FsRuntimeStore, FsRuntimeStoreOptions, PersistedRuntimeState, PersistedWorkerRecord, +}; use crate::identity::{RuntimeId, WorkerId, WorkerRef}; use crate::interaction::{WorkerInput, WorkerInputKind, WorkerInteractionAck}; use crate::management::{ @@ -20,8 +24,9 @@ static NEXT_RUNTIME_SEQUENCE: AtomicU64 = AtomicU64::new(1); /// Concrete embedded Runtime domain entity. /// -/// The current implementation is memory-backed and tools/provider-less by -/// design. It provides a typed API boundary that can later be adapted by +/// The default implementation is memory-backed and tools/provider-less by +/// design. An optional `fs-store` feature adds filesystem persistence while +/// preserving the same typed authority boundary. It can later be adapted by /// backend registries or web servers without making sockets, sessions, or paths /// public authority. #[derive(Clone, Debug)] @@ -47,6 +52,38 @@ impl Runtime { } } + /// Create or restore a filesystem-backed Runtime. + /// + /// The store is scoped by typed Runtime identity under `options.root`; if the + /// Runtime directory already exists, persisted state is loaded and validated. + /// If it does not exist, a fresh Runtime is initialized and durable files are + /// created before the Runtime is returned. + #[cfg(feature = "fs-store")] + pub fn with_fs_store(options: FsRuntimeStoreOptions) -> Result { + let runtime_id = options.runtime_id.unwrap_or_else(|| { + RuntimeId::generated(NEXT_RUNTIME_SEQUENCE.fetch_add(1, Ordering::Relaxed)) + }); + let opened = FsRuntimeStore::open_or_create(options.root, runtime_id.clone())?; + let state = if let Some(persisted) = opened.state { + RuntimeState::from_persisted(persisted, opened.store)? + } else { + let mut state = RuntimeState::new_fs_backed( + runtime_id, + options.display_name, + options.limits, + opened.store, + ); + let event_id = + state.push_event(None, RuntimeEventKind::RuntimeStarted, "runtime started"); + state.persist_runtime_snapshot()?; + state.persist_event_by_id(event_id)?; + state + }; + Ok(Self { + inner: Arc::new(Mutex::new(state)), + }) + } + /// Runtime id half of public Worker authority. pub fn runtime_id(&self) -> Result { Ok(self.lock()?.runtime_id.clone()) @@ -99,11 +136,15 @@ impl Runtime { worker.status = WorkerStatus::Stopped; } } - Ok(state.push_event( + let event_id = state.push_event( None, RuntimeEventKind::RuntimeStopped, format!("runtime {runtime_id} stopped"), - )) + ); + state.persist_runtime_snapshot()?; + state.persist_workers()?; + state.persist_event_by_id(event_id)?; + Ok(event_id) } /// Create a Worker in the embedded catalog. @@ -135,7 +176,10 @@ impl Runtime { }; let detail = record.detail(&state.runtime_id); state.emit_create_diagnostics(&detail); - state.workers.insert(worker_id, record); + state.workers.insert(worker_id.clone(), record); + state.persist_runtime_snapshot()?; + state.persist_worker(&worker_id)?; + state.persist_event_by_id(event_id)?; Ok(detail) } @@ -198,9 +242,15 @@ impl Runtime { event_id, }); + let status = worker.status; + state.persist_runtime_snapshot()?; + state.persist_worker(&worker_ref.worker_id)?; + state.persist_event_by_id(event_id)?; + state.persist_transcript_entry(&worker_ref.worker_id, transcript_sequence)?; + Ok(WorkerInteractionAck { worker_ref: worker_ref.clone(), - status: worker.status, + status, transcript_sequence, event_id, }) @@ -376,9 +426,13 @@ impl Runtime { let worker = state.worker_mut(worker_ref)?; worker.status = status; worker.last_event_id = event_id; + let status = worker.status; + state.persist_runtime_snapshot()?; + state.persist_worker(&worker_ref.worker_id)?; + state.persist_event_by_id(event_id)?; Ok(WorkerLifecycleAck { worker_ref: worker_ref.clone(), - status: worker.status, + status, event_id, }) } @@ -388,11 +442,21 @@ impl Runtime { } } +#[cfg_attr(not(feature = "fs-store"), allow(dead_code))] +#[derive(Clone, Debug)] +enum RuntimePersistence { + Memory, + #[cfg(feature = "fs-store")] + Fs(FsRuntimeStore), +} + #[derive(Debug)] struct RuntimeState { runtime_id: RuntimeId, display_name: Option, backend: RuntimeBackendKind, + #[cfg_attr(not(feature = "fs-store"), allow(dead_code))] + persistence: RuntimePersistence, status: RuntimeStatus, limits: RuntimeLimits, next_worker_sequence: u64, @@ -409,6 +473,7 @@ impl RuntimeState { runtime_id, display_name, backend: RuntimeBackendKind::Memory, + persistence: RuntimePersistence::Memory, status: RuntimeStatus::Running, limits, next_worker_sequence: 1, @@ -420,6 +485,215 @@ impl RuntimeState { } } + #[cfg(feature = "fs-store")] + fn new_fs_backed( + runtime_id: RuntimeId, + display_name: Option, + limits: RuntimeLimits, + store: FsRuntimeStore, + ) -> Self { + Self { + runtime_id, + display_name, + backend: RuntimeBackendKind::FsStore, + persistence: RuntimePersistence::Fs(store), + status: RuntimeStatus::Running, + limits, + next_worker_sequence: 1, + next_event_id: 1, + next_diagnostic_id: 1, + workers: BTreeMap::new(), + events: Vec::new(), + diagnostics: Vec::new(), + } + } + + #[cfg(feature = "fs-store")] + fn from_persisted( + persisted: PersistedRuntimeState, + store: FsRuntimeStore, + ) -> Result { + if persisted.runtime_id != *store.runtime_id() { + return Err(RuntimeError::StoreCorrupt { + operation: "restore runtime state", + path: store.runtime_dir().to_path_buf(), + message: format!( + "persisted runtime id {} does not match store runtime {}", + persisted.runtime_id, + store.runtime_id() + ), + }); + } + + let mut workers = BTreeMap::new(); + for (worker_id, worker) in persisted.workers { + workers.insert( + worker_id, + WorkerRecord { + worker_ref: worker.worker_ref, + worker_id: worker.worker_id, + status: worker.status, + request: worker.request, + transcript: worker.transcript, + next_transcript_sequence: worker.next_transcript_sequence, + last_event_id: worker.last_event_id, + }, + ); + } + + Ok(Self { + runtime_id: persisted.runtime_id, + display_name: persisted.display_name, + backend: RuntimeBackendKind::FsStore, + persistence: RuntimePersistence::Fs(store), + status: persisted.status, + limits: persisted.limits, + next_worker_sequence: persisted.next_worker_sequence, + next_event_id: persisted.next_event_id, + next_diagnostic_id: persisted.next_diagnostic_id, + workers, + events: persisted.events, + diagnostics: persisted.diagnostics, + }) + } + + #[cfg(feature = "fs-store")] + fn persisted_state(&self) -> PersistedRuntimeState { + PersistedRuntimeState { + runtime_id: self.runtime_id.clone(), + display_name: self.display_name.clone(), + status: self.status, + limits: self.limits.clone(), + next_worker_sequence: self.next_worker_sequence, + next_event_id: self.next_event_id, + next_diagnostic_id: self.next_diagnostic_id, + workers: self + .workers + .iter() + .map(|(worker_id, worker)| (worker_id.clone(), worker.persisted_record())) + .collect(), + events: self.events.clone(), + diagnostics: self.diagnostics.clone(), + } + } + + #[cfg(feature = "fs-store")] + fn fs_store(&self) -> Option<&FsRuntimeStore> { + match &self.persistence { + RuntimePersistence::Memory => None, + RuntimePersistence::Fs(store) => Some(store), + } + } + + #[cfg(feature = "fs-store")] + fn persist_runtime_snapshot(&self) -> Result<(), RuntimeError> { + if let Some(store) = self.fs_store() { + store.write_runtime_snapshot(&self.persisted_state())?; + } + Ok(()) + } + + #[cfg(feature = "fs-store")] + fn persist_worker(&self, worker_id: &WorkerId) -> Result<(), RuntimeError> { + if let Some(store) = self.fs_store() { + let worker = + self.workers + .get(worker_id) + .ok_or_else(|| RuntimeError::WorkerNotFound { + runtime_id: self.runtime_id.clone(), + worker_id: worker_id.clone(), + })?; + store.write_worker_snapshot(&worker.persisted_record())?; + } + Ok(()) + } + + #[cfg(feature = "fs-store")] + fn persist_event_by_id(&self, event_id: u64) -> Result<(), RuntimeError> { + if let Some(store) = self.fs_store() { + let event = self + .events + .iter() + .find(|event| event.id == event_id) + .ok_or_else(|| RuntimeError::StoreCorrupt { + operation: "persist event", + path: store.runtime_dir().to_path_buf(), + message: format!("event {event_id} is missing from runtime state"), + })?; + store.append_event(event)?; + } + Ok(()) + } + + #[cfg(feature = "fs-store")] + fn persist_transcript_entry( + &self, + worker_id: &WorkerId, + sequence: u64, + ) -> Result<(), RuntimeError> { + if let Some(store) = self.fs_store() { + let worker = + self.workers + .get(worker_id) + .ok_or_else(|| RuntimeError::WorkerNotFound { + runtime_id: self.runtime_id.clone(), + worker_id: worker_id.clone(), + })?; + let entry = worker + .transcript + .iter() + .find(|entry| entry.sequence == sequence) + .ok_or_else(|| RuntimeError::StoreCorrupt { + operation: "persist transcript", + path: store.runtime_dir().to_path_buf(), + message: format!( + "transcript sequence {sequence} is missing from worker {worker_id}" + ), + })?; + store.append_transcript_entry(entry)?; + } + Ok(()) + } + + #[cfg(feature = "fs-store")] + fn persist_workers(&self) -> Result<(), RuntimeError> { + if self.fs_store().is_some() { + for worker_id in self.workers.keys() { + self.persist_worker(worker_id)?; + } + } + Ok(()) + } + + #[cfg(not(feature = "fs-store"))] + fn persist_runtime_snapshot(&self) -> Result<(), RuntimeError> { + Ok(()) + } + + #[cfg(not(feature = "fs-store"))] + fn persist_worker(&self, _worker_id: &WorkerId) -> Result<(), RuntimeError> { + Ok(()) + } + + #[cfg(not(feature = "fs-store"))] + fn persist_event_by_id(&self, _event_id: u64) -> Result<(), RuntimeError> { + Ok(()) + } + + #[cfg(not(feature = "fs-store"))] + fn persist_transcript_entry( + &self, + _worker_id: &WorkerId, + _sequence: u64, + ) -> Result<(), RuntimeError> { + Ok(()) + } + + #[cfg(not(feature = "fs-store"))] + fn persist_workers(&self) -> Result<(), RuntimeError> { + Ok(()) + } + fn ensure_running(&self) -> Result<(), RuntimeError> { if self.status == RuntimeStatus::Stopped { Err(RuntimeError::RuntimeStopped { @@ -569,6 +843,19 @@ impl WorkerRecord { last_event_id: self.last_event_id, } } + + #[cfg(feature = "fs-store")] + fn persisted_record(&self) -> PersistedWorkerRecord { + PersistedWorkerRecord { + worker_ref: self.worker_ref.clone(), + worker_id: self.worker_id.clone(), + status: self.status, + request: self.request.clone(), + transcript: self.transcript.clone(), + next_transcript_sequence: self.next_transcript_sequence, + last_event_id: self.last_event_id, + } + } } fn validate_create_worker_request(request: &CreateWorkerRequest) -> Result<(), RuntimeError> { @@ -847,4 +1134,155 @@ mod tests { assert_eq!(next.events[0].kind, RuntimeEventKind::WorkerInputAccepted); assert!(!next.has_more); } + + #[cfg(feature = "fs-store")] + static NEXT_FS_TEST_ROOT: AtomicU64 = AtomicU64::new(1); + + #[cfg(feature = "fs-store")] + fn fs_store_root(label: &str) -> std::path::PathBuf { + let sequence = NEXT_FS_TEST_ROOT.fetch_add(1, Ordering::Relaxed); + let root = std::env::temp_dir().join(format!( + "worker-runtime-fs-store-{label}-{}-{sequence}", + std::process::id() + )); + let _ = std::fs::remove_dir_all(&root); + root + } + + #[cfg(feature = "fs-store")] + fn runtime_store(runtime: &Runtime) -> FsRuntimeStore { + let state = runtime.lock().unwrap(); + match &state.persistence { + RuntimePersistence::Fs(store) => store.clone(), + RuntimePersistence::Memory => panic!("expected fs-backed runtime"), + } + } + + #[cfg(feature = "fs-store")] + #[test] + fn fs_store_restores_workers_events_and_transcripts() { + let root = fs_store_root("restore"); + let runtime_id = RuntimeId::new("runtime-fs-authority").unwrap(); + let runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions { + root: root.clone(), + runtime_id: Some(runtime_id.clone()), + display_name: Some("filesystem runtime".to_string()), + limits: RuntimeLimits { + max_transcript_projection_items: 2, + max_event_batch_items: 2, + }, + }) + .unwrap(); + assert_eq!( + runtime.summary().unwrap().backend, + RuntimeBackendKind::FsStore + ); + + let worker = runtime.create_worker(task_request("persist me")).unwrap(); + runtime + .send_input(&worker.worker_ref, WorkerInput::user("first")) + .unwrap(); + runtime + .send_input(&worker.worker_ref, WorkerInput::system("second")) + .unwrap(); + runtime + .stop_worker(&worker.worker_ref, Some("finished".to_string())) + .unwrap(); + let store = runtime_store(&runtime); + drop(runtime); + + let restored = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions { + root: root.clone(), + runtime_id: Some(runtime_id.clone()), + display_name: None, + limits: RuntimeLimits::default(), + }) + .unwrap(); + let restored_worker = restored.worker_detail(&worker.worker_ref).unwrap(); + assert_eq!(restored_worker.status, WorkerStatus::Stopped); + assert_eq!(restored_worker.transcript_len, 2); + + let projection = restored + .transcript_projection(&worker.worker_ref, TranscriptQuery::new(0, 1)) + .unwrap(); + assert_eq!(projection.total_items, 2); + assert_eq!(projection.items[0].content, "first"); + assert_eq!(projection.next_start, Some(1)); + + let cursor = restored.event_cursor_from_start().unwrap(); + let batch = restored.read_events(&cursor, 2).unwrap(); + assert_eq!(batch.events.len(), 2); + assert!(batch.has_more); + assert_eq!(batch.events[0].kind, RuntimeEventKind::RuntimeStarted); + assert_eq!(batch.events[1].kind, RuntimeEventKind::WorkerCreated); + + let direct_events = store.read_events(&cursor, 2, 2).unwrap(); + assert_eq!(direct_events.events, batch.events); + let direct_transcript = store + .read_transcript(&worker.worker_ref, TranscriptQuery::new(1, 1), 2) + .unwrap(); + assert_eq!(direct_transcript.items[0].content, "second"); + + let _ = std::fs::remove_dir_all(root); + } + + #[cfg(feature = "fs-store")] + #[test] + fn fs_store_reports_corrupt_and_missing_data() { + let corrupt_root = fs_store_root("corrupt"); + let corrupt_runtime_id = RuntimeId::new("runtime-corrupt").unwrap(); + let corrupt_runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions { + root: corrupt_root.clone(), + runtime_id: Some(corrupt_runtime_id.clone()), + display_name: None, + limits: RuntimeLimits::default(), + }) + .unwrap(); + let corrupt_store = runtime_store(&corrupt_runtime); + std::fs::write( + corrupt_store.runtime_dir().join("runtime.json"), + b"not json", + ) + .unwrap(); + drop(corrupt_runtime); + let err = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions { + root: corrupt_root.clone(), + runtime_id: Some(corrupt_runtime_id), + display_name: None, + limits: RuntimeLimits::default(), + }) + .unwrap_err(); + assert!(matches!(err, RuntimeError::StoreCorrupt { .. })); + let _ = std::fs::remove_dir_all(corrupt_root); + + let missing_root = fs_store_root("missing"); + let missing_runtime_id = RuntimeId::new("runtime-missing").unwrap(); + let missing_runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions { + root: missing_root.clone(), + runtime_id: Some(missing_runtime_id.clone()), + display_name: None, + limits: RuntimeLimits::default(), + }) + .unwrap(); + missing_runtime + .create_worker(task_request("missing worker snapshot")) + .unwrap(); + let missing_store = runtime_store(&missing_runtime); + let mut worker_dirs = std::fs::read_dir(missing_store.runtime_dir().join("workers")) + .unwrap() + .collect::, _>>() + .unwrap(); + worker_dirs.sort_by_key(|entry| entry.path()); + std::fs::remove_file(worker_dirs[0].path().join("worker.json")).unwrap(); + drop(missing_runtime); + let err = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions { + root: missing_root.clone(), + runtime_id: Some(missing_runtime_id), + display_name: None, + limits: RuntimeLimits::default(), + }) + .unwrap_err(); + assert!(matches!(err, RuntimeError::StoreMissing { .. })); + let _ = std::fs::remove_dir_all(missing_root); + } } diff --git a/package.nix b/package.nix index b1dcfab8..61ccfcfc 100644 --- a/package.nix +++ b/package.nix @@ -43,7 +43,7 @@ rustPlatform.buildRustPackage rec { filter = sourceFilter; }; - cargoHash = "sha256-RHo2b6dVClqu32wpgES/RQeBMXaqyqXZaooeSH5SveM="; + cargoHash = "sha256-PFh+ZgmktkpeLRnIDLsxdT2QcA/j5rcJzkq7A9B6E44="; depsExtraArgs = { # Older fetchCargoVendor utilities used crates.io's API download endpoint,