feat: add worker runtime fs store
This commit is contained in:
parent
40d4138068
commit
4071343996
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -5906,6 +5906,7 @@ name = "worker-runtime"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror 2.0.18",
|
||||
]
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,11 @@ version = "0.1.0"
|
|||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
fs-store = ["dep:serde_json"]
|
||||
|
||||
[dependencies]
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true, optional = true }
|
||||
thiserror = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
use crate::identity::{RuntimeId, WorkerId};
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Errors returned by the embedded Runtime API.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
|
|
@ -33,6 +34,27 @@ pub enum RuntimeError {
|
|||
#[error("invalid request: {0}")]
|
||||
InvalidRequest(String),
|
||||
|
||||
#[error("runtime store {operation} failed at {}: {source}", path.display())]
|
||||
StoreIo {
|
||||
operation: &'static str,
|
||||
path: PathBuf,
|
||||
#[source]
|
||||
source: std::io::Error,
|
||||
},
|
||||
|
||||
#[error("runtime store {operation} missing data at {}", path.display())]
|
||||
StoreMissing {
|
||||
operation: &'static str,
|
||||
path: PathBuf,
|
||||
},
|
||||
|
||||
#[error("runtime store {operation} found corrupt data at {}: {message}", path.display())]
|
||||
StoreCorrupt {
|
||||
operation: &'static str,
|
||||
path: PathBuf,
|
||||
message: String,
|
||||
},
|
||||
|
||||
#[error("runtime state lock was poisoned")]
|
||||
StatePoisoned,
|
||||
}
|
||||
|
|
|
|||
755
crates/worker-runtime/src/fs_store.rs
Normal file
755
crates/worker-runtime/src/fs_store.rs
Normal file
|
|
@ -0,0 +1,755 @@
|
|||
use crate::catalog::{CreateWorkerRequest, WorkerStatus};
|
||||
use crate::diagnostics::RuntimeDiagnostic;
|
||||
use crate::error::RuntimeError;
|
||||
use crate::identity::{RuntimeId, WorkerId, WorkerRef};
|
||||
use crate::management::{RuntimeBackendKind, RuntimeLimits, RuntimeStatus};
|
||||
use crate::observation::{
|
||||
EventCursor, RuntimeEvent, RuntimeEventBatch, TranscriptEntry, TranscriptProjection,
|
||||
TranscriptQuery,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::BTreeMap;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{BufRead, BufReader, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
const SCHEMA_VERSION: u32 = 1;
|
||||
const RUNTIMES_DIR: &str = "runtimes";
|
||||
const RUNTIME_FILE: &str = "runtime.json";
|
||||
const EVENTS_FILE: &str = "events.jsonl";
|
||||
const WORKERS_DIR: &str = "workers";
|
||||
const WORKER_FILE: &str = "worker.json";
|
||||
const TRANSCRIPT_FILE: &str = "transcript.jsonl";
|
||||
|
||||
static NEXT_TMP_SEQUENCE: AtomicU64 = AtomicU64::new(1);
|
||||
|
||||
/// Options for constructing a filesystem-backed Runtime store.
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct FsRuntimeStoreOptions {
|
||||
/// Root directory containing all Runtime-scoped store data.
|
||||
pub root: PathBuf,
|
||||
pub runtime_id: Option<RuntimeId>,
|
||||
pub display_name: Option<String>,
|
||||
pub limits: RuntimeLimits,
|
||||
}
|
||||
|
||||
impl FsRuntimeStoreOptions {
|
||||
pub fn new(root: impl Into<PathBuf>) -> Self {
|
||||
Self {
|
||||
root: root.into(),
|
||||
runtime_id: None,
|
||||
display_name: None,
|
||||
limits: RuntimeLimits::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Filesystem persistence boundary for Worker Runtime state.
|
||||
///
|
||||
/// Authority is the typed `runtime_id + worker_id` pair. Those ids are encoded
|
||||
/// into path components only after validation; legacy pod paths, socket paths,
|
||||
/// and session paths are deliberately not part of the layout or lookup API.
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct FsRuntimeStore {
|
||||
root: PathBuf,
|
||||
runtime_id: RuntimeId,
|
||||
runtime_dir: PathBuf,
|
||||
}
|
||||
|
||||
impl FsRuntimeStore {
|
||||
pub fn root(&self) -> &Path {
|
||||
&self.root
|
||||
}
|
||||
|
||||
pub fn runtime_id(&self) -> &RuntimeId {
|
||||
&self.runtime_id
|
||||
}
|
||||
|
||||
pub fn runtime_dir(&self) -> &Path {
|
||||
&self.runtime_dir
|
||||
}
|
||||
|
||||
/// Read persisted Runtime events directly from the event log with the same
|
||||
/// bounded cursor semantics as [`crate::Runtime::read_events`].
|
||||
pub fn read_events(
|
||||
&self,
|
||||
cursor: &EventCursor,
|
||||
limit: usize,
|
||||
max_limit: usize,
|
||||
) -> Result<RuntimeEventBatch, RuntimeError> {
|
||||
if cursor.runtime_id != self.runtime_id {
|
||||
return Err(RuntimeError::WrongRuntimeCursor {
|
||||
expected_runtime_id: self.runtime_id.clone(),
|
||||
actual_runtime_id: cursor.runtime_id.clone(),
|
||||
});
|
||||
}
|
||||
if limit > max_limit {
|
||||
return Err(RuntimeError::LimitTooLarge {
|
||||
requested: limit,
|
||||
max: max_limit,
|
||||
});
|
||||
}
|
||||
|
||||
let events = read_json_lines::<RuntimeEvent>(&self.events_path(), "read events")?;
|
||||
let mut selected = Vec::new();
|
||||
for event in events
|
||||
.iter()
|
||||
.filter(|event| event.id >= cursor.next_event_id)
|
||||
.take(limit)
|
||||
{
|
||||
selected.push(event.clone());
|
||||
}
|
||||
let next_event_id = selected
|
||||
.last()
|
||||
.map(|event| event.id + 1)
|
||||
.unwrap_or(cursor.next_event_id);
|
||||
let has_more = events.iter().any(|event| event.id >= next_event_id);
|
||||
|
||||
Ok(RuntimeEventBatch {
|
||||
runtime_id: self.runtime_id.clone(),
|
||||
cursor: EventCursor {
|
||||
runtime_id: self.runtime_id.clone(),
|
||||
next_event_id,
|
||||
},
|
||||
events: selected,
|
||||
has_more,
|
||||
})
|
||||
}
|
||||
|
||||
/// Read a persisted Worker transcript directly from its Worker-scoped log.
|
||||
pub fn read_transcript(
|
||||
&self,
|
||||
worker_ref: &WorkerRef,
|
||||
query: TranscriptQuery,
|
||||
max_limit: usize,
|
||||
) -> Result<TranscriptProjection, RuntimeError> {
|
||||
self.ensure_worker_ref(worker_ref)?;
|
||||
if query.limit > max_limit {
|
||||
return Err(RuntimeError::LimitTooLarge {
|
||||
requested: query.limit,
|
||||
max: max_limit,
|
||||
});
|
||||
}
|
||||
|
||||
let path = self.transcript_path(&worker_ref.worker_id);
|
||||
let entries = read_json_lines::<TranscriptEntry>(&path, "read transcript")?;
|
||||
let total_items = entries.len();
|
||||
let end = query.start.saturating_add(query.limit).min(total_items);
|
||||
let items = if query.start >= total_items {
|
||||
Vec::new()
|
||||
} else {
|
||||
entries[query.start..end].to_vec()
|
||||
};
|
||||
let next_start = (end < total_items).then_some(end);
|
||||
|
||||
Ok(TranscriptProjection {
|
||||
worker_ref: worker_ref.clone(),
|
||||
start: query.start,
|
||||
limit: query.limit,
|
||||
total_items,
|
||||
items,
|
||||
next_start,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn open_or_create(
|
||||
root: PathBuf,
|
||||
runtime_id: RuntimeId,
|
||||
) -> Result<OpenedFsRuntimeStore, RuntimeError> {
|
||||
fs::create_dir_all(root.join(RUNTIMES_DIR)).map_err(|source| RuntimeError::StoreIo {
|
||||
operation: "create store root",
|
||||
path: root.join(RUNTIMES_DIR),
|
||||
source,
|
||||
})?;
|
||||
|
||||
let runtime_dir = runtime_dir(&root, &runtime_id);
|
||||
let existed = runtime_dir.exists();
|
||||
if existed && !runtime_dir.is_dir() {
|
||||
return Err(RuntimeError::StoreCorrupt {
|
||||
operation: "open runtime store",
|
||||
path: runtime_dir,
|
||||
message: "runtime path exists but is not a directory".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
fs::create_dir_all(runtime_dir.join(WORKERS_DIR)).map_err(|source| {
|
||||
RuntimeError::StoreIo {
|
||||
operation: "create runtime store",
|
||||
path: runtime_dir.join(WORKERS_DIR),
|
||||
source,
|
||||
}
|
||||
})?;
|
||||
|
||||
let store = Self {
|
||||
root,
|
||||
runtime_id,
|
||||
runtime_dir,
|
||||
};
|
||||
let state = if existed {
|
||||
Some(store.load_runtime_state()?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok(OpenedFsRuntimeStore { store, state })
|
||||
}
|
||||
|
||||
pub(crate) fn write_runtime_snapshot(
|
||||
&self,
|
||||
state: &PersistedRuntimeState,
|
||||
) -> Result<(), RuntimeError> {
|
||||
let snapshot = RuntimeSnapshot::from_persisted(state);
|
||||
atomic_write_json(&self.runtime_path(), &snapshot, "write runtime snapshot")
|
||||
}
|
||||
|
||||
pub(crate) fn write_worker_snapshot(
|
||||
&self,
|
||||
worker: &PersistedWorkerRecord,
|
||||
) -> Result<(), RuntimeError> {
|
||||
self.ensure_worker_ref(&worker.worker_ref)?;
|
||||
let worker_dir = self.worker_dir(&worker.worker_id);
|
||||
fs::create_dir_all(&worker_dir).map_err(|source| RuntimeError::StoreIo {
|
||||
operation: "create worker store",
|
||||
path: worker_dir.clone(),
|
||||
source,
|
||||
})?;
|
||||
atomic_write_json(
|
||||
&worker_dir.join(WORKER_FILE),
|
||||
&WorkerSnapshot::from_persisted(worker),
|
||||
"write worker snapshot",
|
||||
)?;
|
||||
ensure_file_exists(&worker_dir.join(TRANSCRIPT_FILE), "create transcript log")
|
||||
}
|
||||
|
||||
pub(crate) fn append_event(&self, event: &RuntimeEvent) -> Result<(), RuntimeError> {
|
||||
if let Some(worker_ref) = &event.worker_ref {
|
||||
self.ensure_worker_ref(worker_ref)?;
|
||||
}
|
||||
append_json_line(&self.events_path(), event, "append event")
|
||||
}
|
||||
|
||||
pub(crate) fn append_transcript_entry(
|
||||
&self,
|
||||
entry: &TranscriptEntry,
|
||||
) -> Result<(), RuntimeError> {
|
||||
self.ensure_worker_ref(&entry.worker_ref)?;
|
||||
append_json_line(
|
||||
&self.transcript_path(&entry.worker_ref.worker_id),
|
||||
entry,
|
||||
"append transcript",
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn load_runtime_state(&self) -> Result<PersistedRuntimeState, RuntimeError> {
|
||||
let runtime_path = self.runtime_path();
|
||||
let snapshot: RuntimeSnapshot = read_json(&runtime_path, "read runtime snapshot")?;
|
||||
snapshot.validate(&self.runtime_id, &runtime_path)?;
|
||||
|
||||
let events = read_json_lines::<RuntimeEvent>(&self.events_path(), "read events")?;
|
||||
let workers_dir = self.runtime_dir.join(WORKERS_DIR);
|
||||
if !workers_dir.exists() {
|
||||
return Err(RuntimeError::StoreMissing {
|
||||
operation: "read workers",
|
||||
path: workers_dir,
|
||||
});
|
||||
}
|
||||
if !workers_dir.is_dir() {
|
||||
return Err(RuntimeError::StoreCorrupt {
|
||||
operation: "read workers",
|
||||
path: workers_dir,
|
||||
message: "workers path exists but is not a directory".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let mut workers = BTreeMap::new();
|
||||
let mut worker_dirs = fs::read_dir(&workers_dir)
|
||||
.map_err(|source| RuntimeError::StoreIo {
|
||||
operation: "read workers",
|
||||
path: workers_dir.clone(),
|
||||
source,
|
||||
})?
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(|source| RuntimeError::StoreIo {
|
||||
operation: "read workers",
|
||||
path: workers_dir.clone(),
|
||||
source,
|
||||
})?;
|
||||
worker_dirs.sort_by_key(|entry| entry.path());
|
||||
|
||||
for entry in worker_dirs {
|
||||
let path = entry.path();
|
||||
if !path.is_dir() {
|
||||
return Err(RuntimeError::StoreCorrupt {
|
||||
operation: "read worker",
|
||||
path,
|
||||
message: "worker path exists but is not a directory".to_string(),
|
||||
});
|
||||
}
|
||||
let worker_snapshot_path = path.join(WORKER_FILE);
|
||||
let worker_snapshot: WorkerSnapshot =
|
||||
read_json(&worker_snapshot_path, "read worker snapshot")?;
|
||||
worker_snapshot.validate(&self.runtime_id, &worker_snapshot_path)?;
|
||||
let transcript =
|
||||
read_json_lines::<TranscriptEntry>(&path.join(TRANSCRIPT_FILE), "read transcript")?;
|
||||
for entry in &transcript {
|
||||
self.ensure_worker_ref(&entry.worker_ref)?;
|
||||
if entry.worker_ref.worker_id != worker_snapshot.worker_id {
|
||||
return Err(RuntimeError::StoreCorrupt {
|
||||
operation: "read transcript",
|
||||
path: path.join(TRANSCRIPT_FILE),
|
||||
message: format!(
|
||||
"transcript entry belongs to worker {}, expected {}",
|
||||
entry.worker_ref.worker_id, worker_snapshot.worker_id
|
||||
),
|
||||
});
|
||||
}
|
||||
}
|
||||
let worker = worker_snapshot.into_persisted(transcript);
|
||||
if workers.insert(worker.worker_id.clone(), worker).is_some() {
|
||||
return Err(RuntimeError::StoreCorrupt {
|
||||
operation: "read workers",
|
||||
path: workers_dir.clone(),
|
||||
message: "duplicate worker id in store".to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(snapshot.into_persisted(events, workers))
|
||||
}
|
||||
|
||||
fn ensure_worker_ref(&self, worker_ref: &WorkerRef) -> Result<(), RuntimeError> {
|
||||
if worker_ref.runtime_id == self.runtime_id {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(RuntimeError::WrongRuntime {
|
||||
expected_runtime_id: self.runtime_id.clone(),
|
||||
actual_runtime_id: worker_ref.runtime_id.clone(),
|
||||
worker_id: worker_ref.worker_id.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn runtime_path(&self) -> PathBuf {
|
||||
self.runtime_dir.join(RUNTIME_FILE)
|
||||
}
|
||||
|
||||
fn events_path(&self) -> PathBuf {
|
||||
self.runtime_dir.join(EVENTS_FILE)
|
||||
}
|
||||
|
||||
fn worker_dir(&self, worker_id: &WorkerId) -> PathBuf {
|
||||
self.runtime_dir
|
||||
.join(WORKERS_DIR)
|
||||
.join(encoded_component(worker_id.as_str()))
|
||||
}
|
||||
|
||||
fn transcript_path(&self, worker_id: &WorkerId) -> PathBuf {
|
||||
self.worker_dir(worker_id).join(TRANSCRIPT_FILE)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct OpenedFsRuntimeStore {
|
||||
pub(crate) store: FsRuntimeStore,
|
||||
pub(crate) state: Option<PersistedRuntimeState>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct PersistedRuntimeState {
|
||||
pub(crate) runtime_id: RuntimeId,
|
||||
pub(crate) display_name: Option<String>,
|
||||
pub(crate) status: RuntimeStatus,
|
||||
pub(crate) limits: RuntimeLimits,
|
||||
pub(crate) next_worker_sequence: u64,
|
||||
pub(crate) next_event_id: u64,
|
||||
pub(crate) next_diagnostic_id: u64,
|
||||
pub(crate) workers: BTreeMap<WorkerId, PersistedWorkerRecord>,
|
||||
pub(crate) events: Vec<RuntimeEvent>,
|
||||
pub(crate) diagnostics: Vec<RuntimeDiagnostic>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct PersistedWorkerRecord {
|
||||
pub(crate) worker_ref: WorkerRef,
|
||||
pub(crate) worker_id: WorkerId,
|
||||
pub(crate) status: WorkerStatus,
|
||||
pub(crate) request: CreateWorkerRequest,
|
||||
pub(crate) transcript: Vec<TranscriptEntry>,
|
||||
pub(crate) next_transcript_sequence: u64,
|
||||
pub(crate) last_event_id: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
struct RuntimeSnapshot {
|
||||
schema_version: u32,
|
||||
runtime_id: RuntimeId,
|
||||
display_name: Option<String>,
|
||||
backend: RuntimeBackendKind,
|
||||
status: RuntimeStatus,
|
||||
limits: RuntimeLimits,
|
||||
next_worker_sequence: u64,
|
||||
next_event_id: u64,
|
||||
next_diagnostic_id: u64,
|
||||
diagnostics: Vec<RuntimeDiagnostic>,
|
||||
}
|
||||
|
||||
impl RuntimeSnapshot {
|
||||
fn from_persisted(state: &PersistedRuntimeState) -> Self {
|
||||
Self {
|
||||
schema_version: SCHEMA_VERSION,
|
||||
runtime_id: state.runtime_id.clone(),
|
||||
display_name: state.display_name.clone(),
|
||||
backend: RuntimeBackendKind::FsStore,
|
||||
status: state.status,
|
||||
limits: state.limits.clone(),
|
||||
next_worker_sequence: state.next_worker_sequence,
|
||||
next_event_id: state.next_event_id,
|
||||
next_diagnostic_id: state.next_diagnostic_id,
|
||||
diagnostics: state.diagnostics.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn validate(&self, expected_runtime_id: &RuntimeId, path: &Path) -> Result<(), RuntimeError> {
|
||||
if self.schema_version != SCHEMA_VERSION {
|
||||
return Err(RuntimeError::StoreCorrupt {
|
||||
operation: "read runtime snapshot",
|
||||
path: path.to_path_buf(),
|
||||
message: format!(
|
||||
"unsupported schema version {}, expected {}",
|
||||
self.schema_version, SCHEMA_VERSION
|
||||
),
|
||||
});
|
||||
}
|
||||
if &self.runtime_id != expected_runtime_id {
|
||||
return Err(RuntimeError::StoreCorrupt {
|
||||
operation: "read runtime snapshot",
|
||||
path: path.to_path_buf(),
|
||||
message: format!(
|
||||
"runtime snapshot id {} does not match requested runtime {}",
|
||||
self.runtime_id, expected_runtime_id
|
||||
),
|
||||
});
|
||||
}
|
||||
if self.backend != RuntimeBackendKind::FsStore {
|
||||
return Err(RuntimeError::StoreCorrupt {
|
||||
operation: "read runtime snapshot",
|
||||
path: path.to_path_buf(),
|
||||
message: format!("runtime snapshot backend is {:?}", self.backend),
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn into_persisted(
|
||||
self,
|
||||
events: Vec<RuntimeEvent>,
|
||||
workers: BTreeMap<WorkerId, PersistedWorkerRecord>,
|
||||
) -> PersistedRuntimeState {
|
||||
PersistedRuntimeState {
|
||||
runtime_id: self.runtime_id,
|
||||
display_name: self.display_name,
|
||||
status: self.status,
|
||||
limits: self.limits,
|
||||
next_worker_sequence: self.next_worker_sequence,
|
||||
next_event_id: self.next_event_id,
|
||||
next_diagnostic_id: self.next_diagnostic_id,
|
||||
workers,
|
||||
events,
|
||||
diagnostics: self.diagnostics,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
struct WorkerSnapshot {
|
||||
schema_version: u32,
|
||||
worker_ref: WorkerRef,
|
||||
worker_id: WorkerId,
|
||||
status: WorkerStatus,
|
||||
request: CreateWorkerRequest,
|
||||
next_transcript_sequence: u64,
|
||||
last_event_id: u64,
|
||||
}
|
||||
|
||||
impl WorkerSnapshot {
|
||||
fn from_persisted(worker: &PersistedWorkerRecord) -> Self {
|
||||
Self {
|
||||
schema_version: SCHEMA_VERSION,
|
||||
worker_ref: worker.worker_ref.clone(),
|
||||
worker_id: worker.worker_id.clone(),
|
||||
status: worker.status,
|
||||
request: worker.request.clone(),
|
||||
next_transcript_sequence: worker.next_transcript_sequence,
|
||||
last_event_id: worker.last_event_id,
|
||||
}
|
||||
}
|
||||
|
||||
fn validate(&self, expected_runtime_id: &RuntimeId, path: &Path) -> Result<(), RuntimeError> {
|
||||
if self.schema_version != SCHEMA_VERSION {
|
||||
return Err(RuntimeError::StoreCorrupt {
|
||||
operation: "read worker snapshot",
|
||||
path: path.to_path_buf(),
|
||||
message: format!(
|
||||
"unsupported schema version {}, expected {}",
|
||||
self.schema_version, SCHEMA_VERSION
|
||||
),
|
||||
});
|
||||
}
|
||||
if self.worker_ref.runtime_id != *expected_runtime_id {
|
||||
return Err(RuntimeError::StoreCorrupt {
|
||||
operation: "read worker snapshot",
|
||||
path: path.to_path_buf(),
|
||||
message: format!(
|
||||
"worker belongs to runtime {}, expected {}",
|
||||
self.worker_ref.runtime_id, expected_runtime_id
|
||||
),
|
||||
});
|
||||
}
|
||||
if self.worker_ref.worker_id != self.worker_id {
|
||||
return Err(RuntimeError::StoreCorrupt {
|
||||
operation: "read worker snapshot",
|
||||
path: path.to_path_buf(),
|
||||
message: format!(
|
||||
"worker_ref id {} does not match worker_id {}",
|
||||
self.worker_ref.worker_id, self.worker_id
|
||||
),
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn into_persisted(self, transcript: Vec<TranscriptEntry>) -> PersistedWorkerRecord {
|
||||
PersistedWorkerRecord {
|
||||
worker_ref: self.worker_ref,
|
||||
worker_id: self.worker_id,
|
||||
status: self.status,
|
||||
request: self.request,
|
||||
transcript,
|
||||
next_transcript_sequence: self.next_transcript_sequence,
|
||||
last_event_id: self.last_event_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn runtime_dir(root: &Path, runtime_id: &RuntimeId) -> PathBuf {
|
||||
root.join(RUNTIMES_DIR)
|
||||
.join(encoded_component(runtime_id.as_str()))
|
||||
}
|
||||
|
||||
fn encoded_component(value: &str) -> String {
|
||||
let mut encoded = String::with_capacity(3 + value.len() * 2);
|
||||
encoded.push_str("id-");
|
||||
for byte in value.as_bytes() {
|
||||
encoded.push(hex_digit(byte >> 4));
|
||||
encoded.push(hex_digit(byte & 0x0f));
|
||||
}
|
||||
encoded
|
||||
}
|
||||
|
||||
fn hex_digit(value: u8) -> char {
|
||||
match value {
|
||||
0..=9 => (b'0' + value) as char,
|
||||
10..=15 => (b'a' + (value - 10)) as char,
|
||||
_ => unreachable!("hex digit nybble is always <= 15"),
|
||||
}
|
||||
}
|
||||
|
||||
fn read_json<T>(path: &Path, operation: &'static str) -> Result<T, RuntimeError>
|
||||
where
|
||||
T: for<'de> Deserialize<'de>,
|
||||
{
|
||||
let file = File::open(path).map_err(|source| match source.kind() {
|
||||
std::io::ErrorKind::NotFound => RuntimeError::StoreMissing {
|
||||
operation,
|
||||
path: path.to_path_buf(),
|
||||
},
|
||||
_ => RuntimeError::StoreIo {
|
||||
operation,
|
||||
path: path.to_path_buf(),
|
||||
source,
|
||||
},
|
||||
})?;
|
||||
serde_json::from_reader(BufReader::new(file)).map_err(|source| RuntimeError::StoreCorrupt {
|
||||
operation,
|
||||
path: path.to_path_buf(),
|
||||
message: source.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
fn read_json_lines<T>(path: &Path, operation: &'static str) -> Result<Vec<T>, RuntimeError>
|
||||
where
|
||||
T: for<'de> Deserialize<'de>,
|
||||
{
|
||||
let file = File::open(path).map_err(|source| match source.kind() {
|
||||
std::io::ErrorKind::NotFound => RuntimeError::StoreMissing {
|
||||
operation,
|
||||
path: path.to_path_buf(),
|
||||
},
|
||||
_ => RuntimeError::StoreIo {
|
||||
operation,
|
||||
path: path.to_path_buf(),
|
||||
source,
|
||||
},
|
||||
})?;
|
||||
let reader = BufReader::new(file);
|
||||
let mut items = Vec::new();
|
||||
for (index, line) in reader.lines().enumerate() {
|
||||
let line = line.map_err(|source| RuntimeError::StoreIo {
|
||||
operation,
|
||||
path: path.to_path_buf(),
|
||||
source,
|
||||
})?;
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let item = serde_json::from_str(&line).map_err(|source| RuntimeError::StoreCorrupt {
|
||||
operation,
|
||||
path: path.to_path_buf(),
|
||||
message: format!("line {}: {source}", index + 1),
|
||||
})?;
|
||||
items.push(item);
|
||||
}
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
fn atomic_write_json<T>(path: &Path, value: &T, operation: &'static str) -> Result<(), RuntimeError>
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
let parent = path.parent().ok_or_else(|| RuntimeError::StoreCorrupt {
|
||||
operation,
|
||||
path: path.to_path_buf(),
|
||||
message: "path has no parent directory".to_string(),
|
||||
})?;
|
||||
fs::create_dir_all(parent).map_err(|source| RuntimeError::StoreIo {
|
||||
operation,
|
||||
path: parent.to_path_buf(),
|
||||
source,
|
||||
})?;
|
||||
|
||||
let tmp_path = tmp_path_for(path);
|
||||
let write_result = (|| {
|
||||
let mut file = OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(&tmp_path)
|
||||
.map_err(|source| RuntimeError::StoreIo {
|
||||
operation,
|
||||
path: tmp_path.clone(),
|
||||
source,
|
||||
})?;
|
||||
serde_json::to_writer_pretty(&mut file, value).map_err(|source| {
|
||||
RuntimeError::StoreCorrupt {
|
||||
operation,
|
||||
path: tmp_path.clone(),
|
||||
message: format!("serialize json: {source}"),
|
||||
}
|
||||
})?;
|
||||
file.write_all(b"\n")
|
||||
.map_err(|source| RuntimeError::StoreIo {
|
||||
operation,
|
||||
path: tmp_path.clone(),
|
||||
source,
|
||||
})?;
|
||||
file.sync_all().map_err(|source| RuntimeError::StoreIo {
|
||||
operation,
|
||||
path: tmp_path.clone(),
|
||||
source,
|
||||
})?;
|
||||
drop(file);
|
||||
fs::rename(&tmp_path, path).map_err(|source| RuntimeError::StoreIo {
|
||||
operation,
|
||||
path: path.to_path_buf(),
|
||||
source,
|
||||
})?;
|
||||
sync_directory(parent, operation)
|
||||
})();
|
||||
|
||||
if write_result.is_err() {
|
||||
let _ = fs::remove_file(&tmp_path);
|
||||
}
|
||||
write_result
|
||||
}
|
||||
|
||||
fn append_json_line<T>(path: &Path, value: &T, operation: &'static str) -> Result<(), RuntimeError>
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
let parent = path.parent().ok_or_else(|| RuntimeError::StoreCorrupt {
|
||||
operation,
|
||||
path: path.to_path_buf(),
|
||||
message: "path has no parent directory".to_string(),
|
||||
})?;
|
||||
fs::create_dir_all(parent).map_err(|source| RuntimeError::StoreIo {
|
||||
operation,
|
||||
path: parent.to_path_buf(),
|
||||
source,
|
||||
})?;
|
||||
let mut file = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(path)
|
||||
.map_err(|source| RuntimeError::StoreIo {
|
||||
operation,
|
||||
path: path.to_path_buf(),
|
||||
source,
|
||||
})?;
|
||||
serde_json::to_writer(&mut file, value).map_err(|source| RuntimeError::StoreCorrupt {
|
||||
operation,
|
||||
path: path.to_path_buf(),
|
||||
message: format!("serialize json: {source}"),
|
||||
})?;
|
||||
file.write_all(b"\n")
|
||||
.and_then(|()| file.flush())
|
||||
.and_then(|()| file.sync_all())
|
||||
.map_err(|source| RuntimeError::StoreIo {
|
||||
operation,
|
||||
path: path.to_path_buf(),
|
||||
source,
|
||||
})
|
||||
}
|
||||
|
||||
fn ensure_file_exists(path: &Path, operation: &'static str) -> Result<(), RuntimeError> {
|
||||
let parent = path.parent().ok_or_else(|| RuntimeError::StoreCorrupt {
|
||||
operation,
|
||||
path: path.to_path_buf(),
|
||||
message: "path has no parent directory".to_string(),
|
||||
})?;
|
||||
fs::create_dir_all(parent).map_err(|source| RuntimeError::StoreIo {
|
||||
operation,
|
||||
path: parent.to_path_buf(),
|
||||
source,
|
||||
})?;
|
||||
OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(path)
|
||||
.and_then(|file| file.sync_all())
|
||||
.map_err(|source| RuntimeError::StoreIo {
|
||||
operation,
|
||||
path: path.to_path_buf(),
|
||||
source,
|
||||
})
|
||||
}
|
||||
|
||||
fn tmp_path_for(path: &Path) -> PathBuf {
|
||||
let sequence = NEXT_TMP_SEQUENCE.fetch_add(1, Ordering::Relaxed);
|
||||
let file_name = path
|
||||
.file_name()
|
||||
.and_then(|name| name.to_str())
|
||||
.unwrap_or("store");
|
||||
path.with_file_name(format!(
|
||||
".{file_name}.tmp-{}-{sequence}",
|
||||
std::process::id()
|
||||
))
|
||||
}
|
||||
|
||||
fn sync_directory(path: &Path, operation: &'static str) -> Result<(), RuntimeError> {
|
||||
File::open(path)
|
||||
.and_then(|file| file.sync_all())
|
||||
.map_err(|source| RuntimeError::StoreIo {
|
||||
operation,
|
||||
path: path.to_path_buf(),
|
||||
source,
|
||||
})
|
||||
}
|
||||
|
|
@ -1,17 +1,22 @@
|
|||
//! Embedded Runtime domain API for Worker management.
|
||||
//!
|
||||
//! `worker-runtime` intentionally stays independent from HTTP/WebSocket servers,
|
||||
//! filesystem persistence, provider execution, and the existing Worker host. It
|
||||
//! defines the in-process Runtime authority surface that higher layers can later
|
||||
//! adapt into registries or web APIs.
|
||||
//! provider execution, and the existing Worker host. Filesystem persistence is
|
||||
//! available only through the optional `fs-store` feature. The crate defines the
|
||||
//! in-process Runtime authority surface that higher layers can later adapt into
|
||||
//! registries or web APIs.
|
||||
|
||||
pub mod catalog;
|
||||
pub mod diagnostics;
|
||||
pub mod error;
|
||||
#[cfg(feature = "fs-store")]
|
||||
pub mod fs_store;
|
||||
pub mod identity;
|
||||
pub mod interaction;
|
||||
pub mod management;
|
||||
pub mod observation;
|
||||
mod runtime;
|
||||
|
||||
#[cfg(feature = "fs-store")]
|
||||
pub use fs_store::{FsRuntimeStore, FsRuntimeStoreOptions};
|
||||
pub use runtime::Runtime;
|
||||
|
|
|
|||
|
|
@ -1,11 +1,13 @@
|
|||
use crate::identity::RuntimeId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Runtime backend kind. v0 intentionally ships only an embedded memory backend.
|
||||
/// Runtime backend kind.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum RuntimeBackendKind {
|
||||
Memory,
|
||||
#[cfg(feature = "fs-store")]
|
||||
FsStore,
|
||||
}
|
||||
|
||||
/// Runtime lifecycle state.
|
||||
|
|
|
|||
|
|
@ -3,6 +3,10 @@ use crate::catalog::{
|
|||
};
|
||||
use crate::diagnostics::{DiagnosticSeverity, RuntimeDiagnostic};
|
||||
use crate::error::RuntimeError;
|
||||
#[cfg(feature = "fs-store")]
|
||||
use crate::fs_store::{
|
||||
FsRuntimeStore, FsRuntimeStoreOptions, PersistedRuntimeState, PersistedWorkerRecord,
|
||||
};
|
||||
use crate::identity::{RuntimeId, WorkerId, WorkerRef};
|
||||
use crate::interaction::{WorkerInput, WorkerInputKind, WorkerInteractionAck};
|
||||
use crate::management::{
|
||||
|
|
@ -20,8 +24,9 @@ static NEXT_RUNTIME_SEQUENCE: AtomicU64 = AtomicU64::new(1);
|
|||
|
||||
/// Concrete embedded Runtime domain entity.
|
||||
///
|
||||
/// The current implementation is memory-backed and tools/provider-less by
|
||||
/// design. It provides a typed API boundary that can later be adapted by
|
||||
/// The default implementation is memory-backed and tools/provider-less by
|
||||
/// design. An optional `fs-store` feature adds filesystem persistence while
|
||||
/// preserving the same typed authority boundary. It can later be adapted by
|
||||
/// backend registries or web servers without making sockets, sessions, or paths
|
||||
/// public authority.
|
||||
#[derive(Clone, Debug)]
|
||||
|
|
@ -47,6 +52,38 @@ impl Runtime {
|
|||
}
|
||||
}
|
||||
|
||||
/// Create or restore a filesystem-backed Runtime.
|
||||
///
|
||||
/// The store is scoped by typed Runtime identity under `options.root`; if the
|
||||
/// Runtime directory already exists, persisted state is loaded and validated.
|
||||
/// If it does not exist, a fresh Runtime is initialized and durable files are
|
||||
/// created before the Runtime is returned.
|
||||
#[cfg(feature = "fs-store")]
|
||||
pub fn with_fs_store(options: FsRuntimeStoreOptions) -> Result<Self, RuntimeError> {
|
||||
let runtime_id = options.runtime_id.unwrap_or_else(|| {
|
||||
RuntimeId::generated(NEXT_RUNTIME_SEQUENCE.fetch_add(1, Ordering::Relaxed))
|
||||
});
|
||||
let opened = FsRuntimeStore::open_or_create(options.root, runtime_id.clone())?;
|
||||
let state = if let Some(persisted) = opened.state {
|
||||
RuntimeState::from_persisted(persisted, opened.store)?
|
||||
} else {
|
||||
let mut state = RuntimeState::new_fs_backed(
|
||||
runtime_id,
|
||||
options.display_name,
|
||||
options.limits,
|
||||
opened.store,
|
||||
);
|
||||
let event_id =
|
||||
state.push_event(None, RuntimeEventKind::RuntimeStarted, "runtime started");
|
||||
state.persist_runtime_snapshot()?;
|
||||
state.persist_event_by_id(event_id)?;
|
||||
state
|
||||
};
|
||||
Ok(Self {
|
||||
inner: Arc::new(Mutex::new(state)),
|
||||
})
|
||||
}
|
||||
|
||||
/// Runtime id half of public Worker authority.
|
||||
pub fn runtime_id(&self) -> Result<RuntimeId, RuntimeError> {
|
||||
Ok(self.lock()?.runtime_id.clone())
|
||||
|
|
@ -99,11 +136,15 @@ impl Runtime {
|
|||
worker.status = WorkerStatus::Stopped;
|
||||
}
|
||||
}
|
||||
Ok(state.push_event(
|
||||
let event_id = state.push_event(
|
||||
None,
|
||||
RuntimeEventKind::RuntimeStopped,
|
||||
format!("runtime {runtime_id} stopped"),
|
||||
))
|
||||
);
|
||||
state.persist_runtime_snapshot()?;
|
||||
state.persist_workers()?;
|
||||
state.persist_event_by_id(event_id)?;
|
||||
Ok(event_id)
|
||||
}
|
||||
|
||||
/// Create a Worker in the embedded catalog.
|
||||
|
|
@ -135,7 +176,10 @@ impl Runtime {
|
|||
};
|
||||
let detail = record.detail(&state.runtime_id);
|
||||
state.emit_create_diagnostics(&detail);
|
||||
state.workers.insert(worker_id, record);
|
||||
state.workers.insert(worker_id.clone(), record);
|
||||
state.persist_runtime_snapshot()?;
|
||||
state.persist_worker(&worker_id)?;
|
||||
state.persist_event_by_id(event_id)?;
|
||||
Ok(detail)
|
||||
}
|
||||
|
||||
|
|
@ -198,9 +242,15 @@ impl Runtime {
|
|||
event_id,
|
||||
});
|
||||
|
||||
let status = worker.status;
|
||||
state.persist_runtime_snapshot()?;
|
||||
state.persist_worker(&worker_ref.worker_id)?;
|
||||
state.persist_event_by_id(event_id)?;
|
||||
state.persist_transcript_entry(&worker_ref.worker_id, transcript_sequence)?;
|
||||
|
||||
Ok(WorkerInteractionAck {
|
||||
worker_ref: worker_ref.clone(),
|
||||
status: worker.status,
|
||||
status,
|
||||
transcript_sequence,
|
||||
event_id,
|
||||
})
|
||||
|
|
@ -376,9 +426,13 @@ impl Runtime {
|
|||
let worker = state.worker_mut(worker_ref)?;
|
||||
worker.status = status;
|
||||
worker.last_event_id = event_id;
|
||||
let status = worker.status;
|
||||
state.persist_runtime_snapshot()?;
|
||||
state.persist_worker(&worker_ref.worker_id)?;
|
||||
state.persist_event_by_id(event_id)?;
|
||||
Ok(WorkerLifecycleAck {
|
||||
worker_ref: worker_ref.clone(),
|
||||
status: worker.status,
|
||||
status,
|
||||
event_id,
|
||||
})
|
||||
}
|
||||
|
|
@ -388,11 +442,21 @@ impl Runtime {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(not(feature = "fs-store"), allow(dead_code))]
|
||||
#[derive(Clone, Debug)]
|
||||
enum RuntimePersistence {
|
||||
Memory,
|
||||
#[cfg(feature = "fs-store")]
|
||||
Fs(FsRuntimeStore),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RuntimeState {
|
||||
runtime_id: RuntimeId,
|
||||
display_name: Option<String>,
|
||||
backend: RuntimeBackendKind,
|
||||
#[cfg_attr(not(feature = "fs-store"), allow(dead_code))]
|
||||
persistence: RuntimePersistence,
|
||||
status: RuntimeStatus,
|
||||
limits: RuntimeLimits,
|
||||
next_worker_sequence: u64,
|
||||
|
|
@ -409,6 +473,7 @@ impl RuntimeState {
|
|||
runtime_id,
|
||||
display_name,
|
||||
backend: RuntimeBackendKind::Memory,
|
||||
persistence: RuntimePersistence::Memory,
|
||||
status: RuntimeStatus::Running,
|
||||
limits,
|
||||
next_worker_sequence: 1,
|
||||
|
|
@ -420,6 +485,215 @@ impl RuntimeState {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs-store")]
|
||||
fn new_fs_backed(
|
||||
runtime_id: RuntimeId,
|
||||
display_name: Option<String>,
|
||||
limits: RuntimeLimits,
|
||||
store: FsRuntimeStore,
|
||||
) -> Self {
|
||||
Self {
|
||||
runtime_id,
|
||||
display_name,
|
||||
backend: RuntimeBackendKind::FsStore,
|
||||
persistence: RuntimePersistence::Fs(store),
|
||||
status: RuntimeStatus::Running,
|
||||
limits,
|
||||
next_worker_sequence: 1,
|
||||
next_event_id: 1,
|
||||
next_diagnostic_id: 1,
|
||||
workers: BTreeMap::new(),
|
||||
events: Vec::new(),
|
||||
diagnostics: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs-store")]
|
||||
fn from_persisted(
|
||||
persisted: PersistedRuntimeState,
|
||||
store: FsRuntimeStore,
|
||||
) -> Result<Self, RuntimeError> {
|
||||
if persisted.runtime_id != *store.runtime_id() {
|
||||
return Err(RuntimeError::StoreCorrupt {
|
||||
operation: "restore runtime state",
|
||||
path: store.runtime_dir().to_path_buf(),
|
||||
message: format!(
|
||||
"persisted runtime id {} does not match store runtime {}",
|
||||
persisted.runtime_id,
|
||||
store.runtime_id()
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
let mut workers = BTreeMap::new();
|
||||
for (worker_id, worker) in persisted.workers {
|
||||
workers.insert(
|
||||
worker_id,
|
||||
WorkerRecord {
|
||||
worker_ref: worker.worker_ref,
|
||||
worker_id: worker.worker_id,
|
||||
status: worker.status,
|
||||
request: worker.request,
|
||||
transcript: worker.transcript,
|
||||
next_transcript_sequence: worker.next_transcript_sequence,
|
||||
last_event_id: worker.last_event_id,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
runtime_id: persisted.runtime_id,
|
||||
display_name: persisted.display_name,
|
||||
backend: RuntimeBackendKind::FsStore,
|
||||
persistence: RuntimePersistence::Fs(store),
|
||||
status: persisted.status,
|
||||
limits: persisted.limits,
|
||||
next_worker_sequence: persisted.next_worker_sequence,
|
||||
next_event_id: persisted.next_event_id,
|
||||
next_diagnostic_id: persisted.next_diagnostic_id,
|
||||
workers,
|
||||
events: persisted.events,
|
||||
diagnostics: persisted.diagnostics,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs-store")]
|
||||
fn persisted_state(&self) -> PersistedRuntimeState {
|
||||
PersistedRuntimeState {
|
||||
runtime_id: self.runtime_id.clone(),
|
||||
display_name: self.display_name.clone(),
|
||||
status: self.status,
|
||||
limits: self.limits.clone(),
|
||||
next_worker_sequence: self.next_worker_sequence,
|
||||
next_event_id: self.next_event_id,
|
||||
next_diagnostic_id: self.next_diagnostic_id,
|
||||
workers: self
|
||||
.workers
|
||||
.iter()
|
||||
.map(|(worker_id, worker)| (worker_id.clone(), worker.persisted_record()))
|
||||
.collect(),
|
||||
events: self.events.clone(),
|
||||
diagnostics: self.diagnostics.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs-store")]
|
||||
fn fs_store(&self) -> Option<&FsRuntimeStore> {
|
||||
match &self.persistence {
|
||||
RuntimePersistence::Memory => None,
|
||||
RuntimePersistence::Fs(store) => Some(store),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs-store")]
|
||||
fn persist_runtime_snapshot(&self) -> Result<(), RuntimeError> {
|
||||
if let Some(store) = self.fs_store() {
|
||||
store.write_runtime_snapshot(&self.persisted_state())?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs-store")]
|
||||
fn persist_worker(&self, worker_id: &WorkerId) -> Result<(), RuntimeError> {
|
||||
if let Some(store) = self.fs_store() {
|
||||
let worker =
|
||||
self.workers
|
||||
.get(worker_id)
|
||||
.ok_or_else(|| RuntimeError::WorkerNotFound {
|
||||
runtime_id: self.runtime_id.clone(),
|
||||
worker_id: worker_id.clone(),
|
||||
})?;
|
||||
store.write_worker_snapshot(&worker.persisted_record())?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs-store")]
|
||||
fn persist_event_by_id(&self, event_id: u64) -> Result<(), RuntimeError> {
|
||||
if let Some(store) = self.fs_store() {
|
||||
let event = self
|
||||
.events
|
||||
.iter()
|
||||
.find(|event| event.id == event_id)
|
||||
.ok_or_else(|| RuntimeError::StoreCorrupt {
|
||||
operation: "persist event",
|
||||
path: store.runtime_dir().to_path_buf(),
|
||||
message: format!("event {event_id} is missing from runtime state"),
|
||||
})?;
|
||||
store.append_event(event)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs-store")]
|
||||
fn persist_transcript_entry(
|
||||
&self,
|
||||
worker_id: &WorkerId,
|
||||
sequence: u64,
|
||||
) -> Result<(), RuntimeError> {
|
||||
if let Some(store) = self.fs_store() {
|
||||
let worker =
|
||||
self.workers
|
||||
.get(worker_id)
|
||||
.ok_or_else(|| RuntimeError::WorkerNotFound {
|
||||
runtime_id: self.runtime_id.clone(),
|
||||
worker_id: worker_id.clone(),
|
||||
})?;
|
||||
let entry = worker
|
||||
.transcript
|
||||
.iter()
|
||||
.find(|entry| entry.sequence == sequence)
|
||||
.ok_or_else(|| RuntimeError::StoreCorrupt {
|
||||
operation: "persist transcript",
|
||||
path: store.runtime_dir().to_path_buf(),
|
||||
message: format!(
|
||||
"transcript sequence {sequence} is missing from worker {worker_id}"
|
||||
),
|
||||
})?;
|
||||
store.append_transcript_entry(entry)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs-store")]
|
||||
fn persist_workers(&self) -> Result<(), RuntimeError> {
|
||||
if self.fs_store().is_some() {
|
||||
for worker_id in self.workers.keys() {
|
||||
self.persist_worker(worker_id)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "fs-store"))]
|
||||
fn persist_runtime_snapshot(&self) -> Result<(), RuntimeError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "fs-store"))]
|
||||
fn persist_worker(&self, _worker_id: &WorkerId) -> Result<(), RuntimeError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "fs-store"))]
|
||||
fn persist_event_by_id(&self, _event_id: u64) -> Result<(), RuntimeError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "fs-store"))]
|
||||
fn persist_transcript_entry(
|
||||
&self,
|
||||
_worker_id: &WorkerId,
|
||||
_sequence: u64,
|
||||
) -> Result<(), RuntimeError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "fs-store"))]
|
||||
fn persist_workers(&self) -> Result<(), RuntimeError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ensure_running(&self) -> Result<(), RuntimeError> {
|
||||
if self.status == RuntimeStatus::Stopped {
|
||||
Err(RuntimeError::RuntimeStopped {
|
||||
|
|
@ -569,6 +843,19 @@ impl WorkerRecord {
|
|||
last_event_id: self.last_event_id,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs-store")]
|
||||
fn persisted_record(&self) -> PersistedWorkerRecord {
|
||||
PersistedWorkerRecord {
|
||||
worker_ref: self.worker_ref.clone(),
|
||||
worker_id: self.worker_id.clone(),
|
||||
status: self.status,
|
||||
request: self.request.clone(),
|
||||
transcript: self.transcript.clone(),
|
||||
next_transcript_sequence: self.next_transcript_sequence,
|
||||
last_event_id: self.last_event_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_create_worker_request(request: &CreateWorkerRequest) -> Result<(), RuntimeError> {
|
||||
|
|
@ -847,4 +1134,155 @@ mod tests {
|
|||
assert_eq!(next.events[0].kind, RuntimeEventKind::WorkerInputAccepted);
|
||||
assert!(!next.has_more);
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs-store")]
|
||||
static NEXT_FS_TEST_ROOT: AtomicU64 = AtomicU64::new(1);
|
||||
|
||||
#[cfg(feature = "fs-store")]
|
||||
fn fs_store_root(label: &str) -> std::path::PathBuf {
|
||||
let sequence = NEXT_FS_TEST_ROOT.fetch_add(1, Ordering::Relaxed);
|
||||
let root = std::env::temp_dir().join(format!(
|
||||
"worker-runtime-fs-store-{label}-{}-{sequence}",
|
||||
std::process::id()
|
||||
));
|
||||
let _ = std::fs::remove_dir_all(&root);
|
||||
root
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs-store")]
|
||||
fn runtime_store(runtime: &Runtime) -> FsRuntimeStore {
|
||||
let state = runtime.lock().unwrap();
|
||||
match &state.persistence {
|
||||
RuntimePersistence::Fs(store) => store.clone(),
|
||||
RuntimePersistence::Memory => panic!("expected fs-backed runtime"),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs-store")]
|
||||
#[test]
|
||||
fn fs_store_restores_workers_events_and_transcripts() {
|
||||
let root = fs_store_root("restore");
|
||||
let runtime_id = RuntimeId::new("runtime-fs-authority").unwrap();
|
||||
let runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions {
|
||||
root: root.clone(),
|
||||
runtime_id: Some(runtime_id.clone()),
|
||||
display_name: Some("filesystem runtime".to_string()),
|
||||
limits: RuntimeLimits {
|
||||
max_transcript_projection_items: 2,
|
||||
max_event_batch_items: 2,
|
||||
},
|
||||
})
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
runtime.summary().unwrap().backend,
|
||||
RuntimeBackendKind::FsStore
|
||||
);
|
||||
|
||||
let worker = runtime.create_worker(task_request("persist me")).unwrap();
|
||||
runtime
|
||||
.send_input(&worker.worker_ref, WorkerInput::user("first"))
|
||||
.unwrap();
|
||||
runtime
|
||||
.send_input(&worker.worker_ref, WorkerInput::system("second"))
|
||||
.unwrap();
|
||||
runtime
|
||||
.stop_worker(&worker.worker_ref, Some("finished".to_string()))
|
||||
.unwrap();
|
||||
let store = runtime_store(&runtime);
|
||||
drop(runtime);
|
||||
|
||||
let restored = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions {
|
||||
root: root.clone(),
|
||||
runtime_id: Some(runtime_id.clone()),
|
||||
display_name: None,
|
||||
limits: RuntimeLimits::default(),
|
||||
})
|
||||
.unwrap();
|
||||
let restored_worker = restored.worker_detail(&worker.worker_ref).unwrap();
|
||||
assert_eq!(restored_worker.status, WorkerStatus::Stopped);
|
||||
assert_eq!(restored_worker.transcript_len, 2);
|
||||
|
||||
let projection = restored
|
||||
.transcript_projection(&worker.worker_ref, TranscriptQuery::new(0, 1))
|
||||
.unwrap();
|
||||
assert_eq!(projection.total_items, 2);
|
||||
assert_eq!(projection.items[0].content, "first");
|
||||
assert_eq!(projection.next_start, Some(1));
|
||||
|
||||
let cursor = restored.event_cursor_from_start().unwrap();
|
||||
let batch = restored.read_events(&cursor, 2).unwrap();
|
||||
assert_eq!(batch.events.len(), 2);
|
||||
assert!(batch.has_more);
|
||||
assert_eq!(batch.events[0].kind, RuntimeEventKind::RuntimeStarted);
|
||||
assert_eq!(batch.events[1].kind, RuntimeEventKind::WorkerCreated);
|
||||
|
||||
let direct_events = store.read_events(&cursor, 2, 2).unwrap();
|
||||
assert_eq!(direct_events.events, batch.events);
|
||||
let direct_transcript = store
|
||||
.read_transcript(&worker.worker_ref, TranscriptQuery::new(1, 1), 2)
|
||||
.unwrap();
|
||||
assert_eq!(direct_transcript.items[0].content, "second");
|
||||
|
||||
let _ = std::fs::remove_dir_all(root);
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs-store")]
|
||||
#[test]
|
||||
fn fs_store_reports_corrupt_and_missing_data() {
|
||||
let corrupt_root = fs_store_root("corrupt");
|
||||
let corrupt_runtime_id = RuntimeId::new("runtime-corrupt").unwrap();
|
||||
let corrupt_runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions {
|
||||
root: corrupt_root.clone(),
|
||||
runtime_id: Some(corrupt_runtime_id.clone()),
|
||||
display_name: None,
|
||||
limits: RuntimeLimits::default(),
|
||||
})
|
||||
.unwrap();
|
||||
let corrupt_store = runtime_store(&corrupt_runtime);
|
||||
std::fs::write(
|
||||
corrupt_store.runtime_dir().join("runtime.json"),
|
||||
b"not json",
|
||||
)
|
||||
.unwrap();
|
||||
drop(corrupt_runtime);
|
||||
let err = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions {
|
||||
root: corrupt_root.clone(),
|
||||
runtime_id: Some(corrupt_runtime_id),
|
||||
display_name: None,
|
||||
limits: RuntimeLimits::default(),
|
||||
})
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, RuntimeError::StoreCorrupt { .. }));
|
||||
let _ = std::fs::remove_dir_all(corrupt_root);
|
||||
|
||||
let missing_root = fs_store_root("missing");
|
||||
let missing_runtime_id = RuntimeId::new("runtime-missing").unwrap();
|
||||
let missing_runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions {
|
||||
root: missing_root.clone(),
|
||||
runtime_id: Some(missing_runtime_id.clone()),
|
||||
display_name: None,
|
||||
limits: RuntimeLimits::default(),
|
||||
})
|
||||
.unwrap();
|
||||
missing_runtime
|
||||
.create_worker(task_request("missing worker snapshot"))
|
||||
.unwrap();
|
||||
let missing_store = runtime_store(&missing_runtime);
|
||||
let mut worker_dirs = std::fs::read_dir(missing_store.runtime_dir().join("workers"))
|
||||
.unwrap()
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.unwrap();
|
||||
worker_dirs.sort_by_key(|entry| entry.path());
|
||||
std::fs::remove_file(worker_dirs[0].path().join("worker.json")).unwrap();
|
||||
drop(missing_runtime);
|
||||
let err = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions {
|
||||
root: missing_root.clone(),
|
||||
runtime_id: Some(missing_runtime_id),
|
||||
display_name: None,
|
||||
limits: RuntimeLimits::default(),
|
||||
})
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, RuntimeError::StoreMissing { .. }));
|
||||
let _ = std::fs::remove_dir_all(missing_root);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ rustPlatform.buildRustPackage rec {
|
|||
filter = sourceFilter;
|
||||
};
|
||||
|
||||
cargoHash = "sha256-RHo2b6dVClqu32wpgES/RQeBMXaqyqXZaooeSH5SveM=";
|
||||
cargoHash = "sha256-PFh+ZgmktkpeLRnIDLsxdT2QcA/j5rcJzkq7A9B6E44=";
|
||||
|
||||
depsExtraArgs = {
|
||||
# Older fetchCargoVendor utilities used crates.io's API download endpoint,
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user