runtime: persist execution binding projection

This commit is contained in:
Keisuke Hirata 2026-06-29 03:11:16 +09:00
parent 14bb4934a6
commit c29d10b67b
No known key found for this signature in database
6 changed files with 182 additions and 19 deletions

View File

@ -23,6 +23,9 @@ pub enum RuntimeError {
actual_runtime_id: RuntimeId,
},
#[error("initial worker input must be user input, got {kind}")]
InvalidInitialInputKind { kind: String },
#[error("worker {worker_id} was not found in runtime {runtime_id}")]
WorkerNotFound {
runtime_id: RuntimeId,

View File

@ -18,9 +18,29 @@ use std::sync::Arc;
pub enum WorkerExecutionBackendKind {
#[default]
Unconnected,
/// A durable execution binding was restored, but no live handle was recovered.
Stale,
Connected,
}
/// Durable, non-authority execution binding projection.
///
/// This records only enough identity to diagnose stale mappings after restore.
/// It is not a live handle and must not contain sockets, paths, credentials, or
/// provider-private authority.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkerExecutionBindingIdentity {
pub backend_id: String,
}
impl WorkerExecutionBindingIdentity {
pub fn from_handle(handle: &WorkerExecutionHandle) -> Self {
Self {
backend_id: handle.backend_id.clone(),
}
}
}
/// Current execution-side run state for a Worker.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
@ -131,6 +151,8 @@ pub struct WorkerExecutionStatus {
pub backend: WorkerExecutionBackendKind,
pub run_state: WorkerExecutionRunState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub binding: Option<WorkerExecutionBindingIdentity>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_result: Option<WorkerExecutionResult>,
}
@ -143,10 +165,22 @@ impl WorkerExecutionStatus {
Self {
backend: WorkerExecutionBackendKind::Connected,
run_state,
binding: None,
last_result: None,
}
}
pub fn stale(mut previous: Self) -> Self {
previous.backend = WorkerExecutionBackendKind::Stale;
previous.run_state = WorkerExecutionRunState::Unconnected;
previous
}
pub fn with_binding(mut self, binding: WorkerExecutionBindingIdentity) -> Self {
self.binding = Some(binding);
self
}
pub fn with_result(mut self, result: WorkerExecutionResult) -> Self {
self.run_state = result.run_state;
self.last_result = Some(result);

View File

@ -2,6 +2,7 @@ use crate::catalog::{CreateWorkerRequest, WorkerStatus};
use crate::config_bundle::ConfigBundle;
use crate::diagnostics::RuntimeDiagnostic;
use crate::error::RuntimeError;
use crate::execution::WorkerExecutionStatus;
use crate::identity::{RuntimeId, WorkerId, WorkerRef};
use crate::management::{RuntimeBackendKind, RuntimeLimits, RuntimeStatus};
use crate::observation::{
@ -376,6 +377,7 @@ pub(crate) struct PersistedWorkerRecord {
pub(crate) worker_id: WorkerId,
pub(crate) status: WorkerStatus,
pub(crate) request: CreateWorkerRequest,
pub(crate) execution: WorkerExecutionStatus,
pub(crate) transcript: Vec<TranscriptEntry>,
pub(crate) next_transcript_sequence: u64,
pub(crate) last_event_id: u64,
@ -473,6 +475,8 @@ struct WorkerSnapshot {
worker_id: WorkerId,
status: WorkerStatus,
request: CreateWorkerRequest,
#[serde(default = "WorkerExecutionStatus::unconnected")]
execution: WorkerExecutionStatus,
next_transcript_sequence: u64,
last_event_id: u64,
}
@ -485,6 +489,7 @@ impl WorkerSnapshot {
worker_id: worker.worker_id.clone(),
status: worker.status,
request: worker.request.clone(),
execution: worker.execution.clone(),
next_transcript_sequence: worker.next_transcript_sequence,
last_event_id: worker.last_event_id,
}
@ -530,6 +535,7 @@ impl WorkerSnapshot {
worker_id: self.worker_id,
status: self.status,
request: self.request,
execution: self.execution,
transcript,
next_transcript_sequence: self.next_transcript_sequence,
last_event_id: self.last_event_id,

View File

@ -828,6 +828,7 @@ fn status_for_runtime_error(error: &RuntimeError) -> StatusCode {
| RuntimeError::WorkerExecutionRejected { .. } => StatusCode::CONFLICT,
RuntimeError::LimitTooLarge { .. }
| RuntimeError::InvalidRequest(_)
| RuntimeError::InvalidInitialInputKind { .. }
| RuntimeError::ConfigBundleDigestMismatch { .. }
| RuntimeError::InvalidProfileSelector { .. }
| RuntimeError::UnsupportedConfigDeclaration { .. }
@ -851,6 +852,7 @@ fn code_for_runtime_error(error: &RuntimeError) -> &'static str {
RuntimeError::WorkerExecutionRejected { .. } => "worker_execution_rejected",
RuntimeError::LimitTooLarge { .. } => "limit_too_large",
RuntimeError::InvalidRequest(_) => "invalid_request",
RuntimeError::InvalidInitialInputKind { .. } => "invalid_initial_input_kind",
RuntimeError::ConfigBundleMissing { .. } => "config_bundle_missing",
RuntimeError::ConfigBundleDigestMismatch { .. } => "config_bundle_digest_mismatch",
RuntimeError::InvalidProfileSelector { .. } => "invalid_profile_selector",

View File

@ -6,13 +6,15 @@ use crate::config_bundle::{
ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary, validate_config_bundle,
validate_config_bundle_ref,
};
#[cfg(feature = "fs-store")]
use crate::diagnostics::DiagnosticSeverity;
use crate::diagnostics::RuntimeDiagnostic;
use crate::error::RuntimeError;
use crate::execution::{
WorkerExecutionBackend, WorkerExecutionBackendKind, WorkerExecutionBackendRef,
WorkerExecutionHandle, WorkerExecutionOperation, WorkerExecutionResult,
WorkerExecutionRunState, WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult,
WorkerExecutionStatus,
WorkerExecutionBindingIdentity, WorkerExecutionHandle, WorkerExecutionOperation,
WorkerExecutionResult, WorkerExecutionRunState, WorkerExecutionSpawnRequest,
WorkerExecutionSpawnResult, WorkerExecutionStatus,
};
#[cfg(feature = "fs-store")]
use crate::fs_store::{
@ -259,14 +261,10 @@ impl Runtime {
let mut transcript = Vec::new();
let mut next_transcript_sequence = 1;
if let Some(input) = request.initial_input.clone() {
let role = match input.kind {
WorkerInputKind::User => TranscriptRole::User,
WorkerInputKind::System => TranscriptRole::System,
};
transcript.push(TranscriptEntry {
sequence: next_transcript_sequence,
worker_ref: worker_ref.clone(),
role,
role: TranscriptRole::User,
content: input.content,
event_id,
});
@ -389,7 +387,9 @@ impl Runtime {
"worker has no execution backend",
);
let worker = state.worker_mut(worker_ref)?;
worker.execution = WorkerExecutionStatus::unconnected().with_result(result);
let mut execution = WorkerExecutionStatus::unconnected().with_result(result);
execution.binding = worker.execution.binding.clone();
worker.execution = execution;
state.persist_worker(&worker_ref.worker_id)?;
return Err(RuntimeError::WorkerExecutionUnavailable {
worker_id: worker_ref.worker_id.clone(),
@ -431,6 +431,7 @@ impl Runtime {
worker.execution = WorkerExecutionStatus {
backend: WorkerExecutionBackendKind::Connected,
run_state: dispatch_result.run_state,
binding: worker.execution.binding.clone(),
last_result: Some(dispatch_result),
};
worker.transcript.push(TranscriptEntry {
@ -485,9 +486,12 @@ impl Runtime {
let mut state = self.lock()?;
let runtime_id = state.runtime_id.clone();
let detail = {
let binding = WorkerExecutionBindingIdentity::from_handle(&handle);
let worker = state.worker_mut(worker_ref)?;
worker.execution_handle = Some(handle);
worker.execution = WorkerExecutionStatus::connected(run_state).with_result(result);
worker.execution = WorkerExecutionStatus::connected(run_state)
.with_binding(binding)
.with_result(result);
worker.detail(&runtime_id)
};
state.persist_runtime_snapshot()?;
@ -520,6 +524,7 @@ impl Runtime {
worker.execution = WorkerExecutionStatus {
backend: WorkerExecutionBackendKind::Connected,
run_state: result.run_state,
binding: worker.execution.binding.clone(),
last_result: Some(result),
};
state.persist_worker(&worker_ref.worker_id)?;
@ -891,6 +896,8 @@ struct RuntimeState {
execution_backend: Option<WorkerExecutionBackendRef>,
next_worker_sequence: u64,
next_event_id: u64,
#[cfg(feature = "fs-store")]
next_diagnostic_id: u64,
workers: BTreeMap<WorkerId, WorkerRecord>,
config_bundles: BTreeMap<String, ConfigBundle>,
events: Vec<RuntimeEvent>,
@ -915,6 +922,8 @@ impl RuntimeState {
execution_backend: None,
next_worker_sequence: 1,
next_event_id: 1,
#[cfg(feature = "fs-store")]
next_diagnostic_id: 1,
workers: BTreeMap::new(),
config_bundles: BTreeMap::new(),
events: Vec::new(),
@ -945,6 +954,8 @@ impl RuntimeState {
execution_backend: None,
next_worker_sequence: 1,
next_event_id: 1,
#[cfg(feature = "fs-store")]
next_diagnostic_id: 1,
workers: BTreeMap::new(),
config_bundles: BTreeMap::new(),
events: Vec::new(),
@ -976,7 +987,28 @@ impl RuntimeState {
}
let mut workers = BTreeMap::new();
let mut diagnostics = persisted.diagnostics;
let mut next_diagnostic_id = persisted.next_diagnostic_id;
for (worker_id, worker) in persisted.workers {
let execution = if worker.execution.binding.is_some()
&& worker.execution.backend == WorkerExecutionBackendKind::Connected
{
let stale = WorkerExecutionStatus::stale(worker.execution);
diagnostics.push(RuntimeDiagnostic {
id: next_diagnostic_id,
severity: DiagnosticSeverity::Warning,
code: "worker_execution_mapping_stale".to_string(),
message: format!(
"worker {} has persisted execution binding identity but no live execution handle was restored",
worker.worker_id
),
worker_ref: Some(worker.worker_ref.clone()),
});
next_diagnostic_id += 1;
stale
} else {
worker.execution
};
workers.insert(
worker_id,
WorkerRecord {
@ -984,7 +1016,7 @@ impl RuntimeState {
worker_id: worker.worker_id,
status: worker.status,
request: worker.request,
execution: WorkerExecutionStatus::unconnected(),
execution,
execution_handle: None,
transcript: worker.transcript,
next_transcript_sequence: worker.next_transcript_sequence,
@ -1003,11 +1035,11 @@ impl RuntimeState {
execution_backend: None,
next_worker_sequence: persisted.next_worker_sequence,
next_event_id: persisted.next_event_id,
next_diagnostic_id: persisted.next_diagnostic_id,
next_diagnostic_id,
workers,
config_bundles: persisted.config_bundles,
events: persisted.events,
diagnostics: persisted.diagnostics,
diagnostics,
#[cfg(feature = "ws-server")]
next_observation_sequence: 1,
#[cfg(feature = "ws-server")]
@ -1471,6 +1503,7 @@ impl WorkerRecord {
worker_id: self.worker_id.clone(),
status: self.status,
request: self.request.clone(),
execution: self.execution.clone(),
transcript: self.transcript.clone(),
next_transcript_sequence: self.next_transcript_sequence,
last_event_id: self.last_event_id,
@ -1498,6 +1531,11 @@ fn validate_create_worker_request(request: &CreateWorkerRequest) -> Result<(), R
));
}
if let Some(input) = &request.initial_input {
if input.kind != WorkerInputKind::User {
return Err(RuntimeError::InvalidInitialInputKind {
kind: format!("{:?}", input.kind),
});
}
if input.content.trim().is_empty() {
return Err(RuntimeError::InvalidRequest(
"initial_input.content must not be empty".to_string(),
@ -1768,6 +1806,29 @@ mod tests {
assert!(matches!(err, RuntimeError::WrongRuntime { .. }));
}
#[test]
fn create_worker_rejects_system_initial_input_without_persisting_worker() {
let runtime = runtime_with_backend();
let mut request = task_request("system initial input");
request.initial_input = Some(WorkerInput::system("role/system belongs in config bundle"));
let error = runtime.create_worker(request).unwrap_err();
assert!(matches!(
error,
RuntimeError::InvalidInitialInputKind { .. }
));
assert!(runtime.list_workers().unwrap().is_empty());
let events = runtime
.read_events(&runtime.event_cursor_from_start().unwrap(), 16)
.unwrap();
assert!(
events
.events
.iter()
.all(|event| event.kind != RuntimeEventKind::WorkerCreated)
);
}
#[test]
fn create_worker_without_execution_backend_is_rejected_and_not_persisted() {
let runtime = Runtime::new_memory();
@ -2117,6 +2178,7 @@ mod tests {
runtime.summary().unwrap().backend,
RuntimeBackendKind::FsStore
);
runtime.store_config_bundle(test_bundle()).unwrap();
let worker = runtime.create_worker(task_request("persist me")).unwrap();
runtime
@ -2140,6 +2202,28 @@ mod tests {
.unwrap();
let restored_worker = restored.worker_detail(&worker.worker_ref).unwrap();
assert_eq!(restored_worker.status, WorkerStatus::Stopped);
assert_eq!(
restored_worker.execution.backend,
WorkerExecutionBackendKind::Stale
);
assert_eq!(
restored_worker
.execution
.binding
.as_ref()
.map(|binding| binding.backend_id.as_str()),
Some("test-execution-backend")
);
assert!(
restored
.diagnostics()
.unwrap()
.iter()
.any(
|diagnostic| diagnostic.code == "worker_execution_mapping_stale"
&& diagnostic.worker_ref.as_ref() == Some(&worker.worker_ref)
)
);
assert_eq!(restored_worker.transcript_len, 2);
let projection = restored
@ -2215,13 +2299,17 @@ mod tests {
let missing_root = fs_store_root("missing");
let missing_runtime_id = RuntimeId::new("runtime-missing").unwrap();
let missing_runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions {
root: missing_root.clone(),
runtime_id: Some(missing_runtime_id.clone()),
display_name: None,
limits: RuntimeLimits::default(),
})
let missing_runtime = Runtime::with_fs_store_and_execution_backend(
crate::fs_store::FsRuntimeStoreOptions {
root: missing_root.clone(),
runtime_id: Some(missing_runtime_id.clone()),
display_name: None,
limits: RuntimeLimits::default(),
},
Arc::new(TestExecutionBackend::default()),
)
.unwrap();
missing_runtime.store_config_bundle(test_bundle()).unwrap();
missing_runtime
.create_worker(task_request("missing worker snapshot"))
.unwrap();

View File

@ -2356,6 +2356,11 @@ fn embedded_runtime_diagnostic(error: &EmbeddedRuntimeError) -> RuntimeDiagnosti
DiagnosticSeverity::Warning,
format!("Requested limit {requested} exceeds embedded Runtime maximum {max}"),
),
EmbeddedRuntimeError::InvalidInitialInputKind { .. } => diagnostic(
"embedded_worker_initial_input_kind_invalid",
DiagnosticSeverity::Warning,
error.to_string(),
),
EmbeddedRuntimeError::InvalidRequest(_)
| EmbeddedRuntimeError::ConfigBundleMissing { .. }
| EmbeddedRuntimeError::ConfigBundleDigestMismatch { .. }
@ -3000,6 +3005,31 @@ mod tests {
assert!(spawned.worker.is_none());
}
#[test]
fn embedded_runtime_rejects_system_initial_input_without_worker_projection() {
let runtime = EmbeddedWorkerRuntime::new_memory_with_execution_backend(
"local:test",
Arc::new(AcceptingExecutionBackend::default()),
)
.expect("test backend should connect");
let mut request = embedded_spawn_request();
request.initial_input = Some(EmbeddedWorkerInput {
kind: EmbeddedWorkerInputKind::System,
content: "system/role instruction belongs in profile".to_string(),
});
let spawned = runtime.spawn_worker(request);
assert_eq!(spawned.state, WorkerOperationState::Rejected);
assert!(spawned.worker.is_none());
assert!(spawned.diagnostics.iter().any(|diagnostic| {
diagnostic.code == "embedded_worker_initial_input_kind_invalid"
&& diagnostic
.message
.contains("initial worker input must be user input")
}));
assert!(runtime.list_workers(10).items.is_empty());
}
#[test]
fn embedded_runtime_with_execution_backend_routes_input_and_projects_transcript() {
let runtime = EmbeddedWorkerRuntime::new_memory_with_execution_backend(