diff --git a/crates/worker-runtime/src/error.rs b/crates/worker-runtime/src/error.rs index 6fe917c5..14284d7f 100644 --- a/crates/worker-runtime/src/error.rs +++ b/crates/worker-runtime/src/error.rs @@ -23,6 +23,9 @@ pub enum RuntimeError { actual_runtime_id: RuntimeId, }, + #[error("initial worker input must be user input, got {kind}")] + InvalidInitialInputKind { kind: String }, + #[error("worker {worker_id} was not found in runtime {runtime_id}")] WorkerNotFound { runtime_id: RuntimeId, diff --git a/crates/worker-runtime/src/execution.rs b/crates/worker-runtime/src/execution.rs index 46d9bba3..40db94a6 100644 --- a/crates/worker-runtime/src/execution.rs +++ b/crates/worker-runtime/src/execution.rs @@ -18,9 +18,29 @@ use std::sync::Arc; pub enum WorkerExecutionBackendKind { #[default] Unconnected, + /// A durable execution binding was restored, but no live handle was recovered. + Stale, Connected, } +/// Durable, non-authority execution binding projection. +/// +/// This records only enough identity to diagnose stale mappings after restore. +/// It is not a live handle and must not contain sockets, paths, credentials, or +/// provider-private authority. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkerExecutionBindingIdentity { + pub backend_id: String, +} + +impl WorkerExecutionBindingIdentity { + pub fn from_handle(handle: &WorkerExecutionHandle) -> Self { + Self { + backend_id: handle.backend_id.clone(), + } + } +} + /// Current execution-side run state for a Worker. #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -131,6 +151,8 @@ pub struct WorkerExecutionStatus { pub backend: WorkerExecutionBackendKind, pub run_state: WorkerExecutionRunState, #[serde(default, skip_serializing_if = "Option::is_none")] + pub binding: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] pub last_result: Option, } @@ -143,10 +165,22 @@ impl WorkerExecutionStatus { Self { backend: WorkerExecutionBackendKind::Connected, run_state, + binding: None, last_result: None, } } + pub fn stale(mut previous: Self) -> Self { + previous.backend = WorkerExecutionBackendKind::Stale; + previous.run_state = WorkerExecutionRunState::Unconnected; + previous + } + + pub fn with_binding(mut self, binding: WorkerExecutionBindingIdentity) -> Self { + self.binding = Some(binding); + self + } + pub fn with_result(mut self, result: WorkerExecutionResult) -> Self { self.run_state = result.run_state; self.last_result = Some(result); diff --git a/crates/worker-runtime/src/fs_store.rs b/crates/worker-runtime/src/fs_store.rs index bbc7d84f..710af418 100644 --- a/crates/worker-runtime/src/fs_store.rs +++ b/crates/worker-runtime/src/fs_store.rs @@ -2,6 +2,7 @@ use crate::catalog::{CreateWorkerRequest, WorkerStatus}; use crate::config_bundle::ConfigBundle; use crate::diagnostics::RuntimeDiagnostic; use crate::error::RuntimeError; +use crate::execution::WorkerExecutionStatus; use crate::identity::{RuntimeId, WorkerId, WorkerRef}; use crate::management::{RuntimeBackendKind, RuntimeLimits, RuntimeStatus}; use crate::observation::{ @@ -376,6 +377,7 @@ pub(crate) struct PersistedWorkerRecord { pub(crate) worker_id: WorkerId, pub(crate) status: WorkerStatus, pub(crate) request: CreateWorkerRequest, + pub(crate) execution: WorkerExecutionStatus, pub(crate) transcript: Vec, pub(crate) next_transcript_sequence: u64, pub(crate) last_event_id: u64, @@ -473,6 +475,8 @@ struct WorkerSnapshot { worker_id: WorkerId, status: WorkerStatus, request: CreateWorkerRequest, + #[serde(default = "WorkerExecutionStatus::unconnected")] + execution: WorkerExecutionStatus, next_transcript_sequence: u64, last_event_id: u64, } @@ -485,6 +489,7 @@ impl WorkerSnapshot { worker_id: worker.worker_id.clone(), status: worker.status, request: worker.request.clone(), + execution: worker.execution.clone(), next_transcript_sequence: worker.next_transcript_sequence, last_event_id: worker.last_event_id, } @@ -530,6 +535,7 @@ impl WorkerSnapshot { worker_id: self.worker_id, status: self.status, request: self.request, + execution: self.execution, transcript, next_transcript_sequence: self.next_transcript_sequence, last_event_id: self.last_event_id, diff --git a/crates/worker-runtime/src/http_server.rs b/crates/worker-runtime/src/http_server.rs index d53c6bde..a69f0ed9 100644 --- a/crates/worker-runtime/src/http_server.rs +++ b/crates/worker-runtime/src/http_server.rs @@ -828,6 +828,7 @@ fn status_for_runtime_error(error: &RuntimeError) -> StatusCode { | RuntimeError::WorkerExecutionRejected { .. } => StatusCode::CONFLICT, RuntimeError::LimitTooLarge { .. } | RuntimeError::InvalidRequest(_) + | RuntimeError::InvalidInitialInputKind { .. } | RuntimeError::ConfigBundleDigestMismatch { .. } | RuntimeError::InvalidProfileSelector { .. } | RuntimeError::UnsupportedConfigDeclaration { .. } @@ -851,6 +852,7 @@ fn code_for_runtime_error(error: &RuntimeError) -> &'static str { RuntimeError::WorkerExecutionRejected { .. } => "worker_execution_rejected", RuntimeError::LimitTooLarge { .. } => "limit_too_large", RuntimeError::InvalidRequest(_) => "invalid_request", + RuntimeError::InvalidInitialInputKind { .. } => "invalid_initial_input_kind", RuntimeError::ConfigBundleMissing { .. } => "config_bundle_missing", RuntimeError::ConfigBundleDigestMismatch { .. } => "config_bundle_digest_mismatch", RuntimeError::InvalidProfileSelector { .. } => "invalid_profile_selector", diff --git a/crates/worker-runtime/src/runtime.rs b/crates/worker-runtime/src/runtime.rs index 83f5022e..73e736d3 100644 --- a/crates/worker-runtime/src/runtime.rs +++ b/crates/worker-runtime/src/runtime.rs @@ -6,13 +6,15 @@ use crate::config_bundle::{ ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary, validate_config_bundle, validate_config_bundle_ref, }; +#[cfg(feature = "fs-store")] +use crate::diagnostics::DiagnosticSeverity; use crate::diagnostics::RuntimeDiagnostic; use crate::error::RuntimeError; use crate::execution::{ WorkerExecutionBackend, WorkerExecutionBackendKind, WorkerExecutionBackendRef, - WorkerExecutionHandle, WorkerExecutionOperation, WorkerExecutionResult, - WorkerExecutionRunState, WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult, - WorkerExecutionStatus, + WorkerExecutionBindingIdentity, WorkerExecutionHandle, WorkerExecutionOperation, + WorkerExecutionResult, WorkerExecutionRunState, WorkerExecutionSpawnRequest, + WorkerExecutionSpawnResult, WorkerExecutionStatus, }; #[cfg(feature = "fs-store")] use crate::fs_store::{ @@ -259,14 +261,10 @@ impl Runtime { let mut transcript = Vec::new(); let mut next_transcript_sequence = 1; if let Some(input) = request.initial_input.clone() { - let role = match input.kind { - WorkerInputKind::User => TranscriptRole::User, - WorkerInputKind::System => TranscriptRole::System, - }; transcript.push(TranscriptEntry { sequence: next_transcript_sequence, worker_ref: worker_ref.clone(), - role, + role: TranscriptRole::User, content: input.content, event_id, }); @@ -389,7 +387,9 @@ impl Runtime { "worker has no execution backend", ); let worker = state.worker_mut(worker_ref)?; - worker.execution = WorkerExecutionStatus::unconnected().with_result(result); + let mut execution = WorkerExecutionStatus::unconnected().with_result(result); + execution.binding = worker.execution.binding.clone(); + worker.execution = execution; state.persist_worker(&worker_ref.worker_id)?; return Err(RuntimeError::WorkerExecutionUnavailable { worker_id: worker_ref.worker_id.clone(), @@ -431,6 +431,7 @@ impl Runtime { worker.execution = WorkerExecutionStatus { backend: WorkerExecutionBackendKind::Connected, run_state: dispatch_result.run_state, + binding: worker.execution.binding.clone(), last_result: Some(dispatch_result), }; worker.transcript.push(TranscriptEntry { @@ -485,9 +486,12 @@ impl Runtime { let mut state = self.lock()?; let runtime_id = state.runtime_id.clone(); let detail = { + let binding = WorkerExecutionBindingIdentity::from_handle(&handle); let worker = state.worker_mut(worker_ref)?; worker.execution_handle = Some(handle); - worker.execution = WorkerExecutionStatus::connected(run_state).with_result(result); + worker.execution = WorkerExecutionStatus::connected(run_state) + .with_binding(binding) + .with_result(result); worker.detail(&runtime_id) }; state.persist_runtime_snapshot()?; @@ -520,6 +524,7 @@ impl Runtime { worker.execution = WorkerExecutionStatus { backend: WorkerExecutionBackendKind::Connected, run_state: result.run_state, + binding: worker.execution.binding.clone(), last_result: Some(result), }; state.persist_worker(&worker_ref.worker_id)?; @@ -891,6 +896,8 @@ struct RuntimeState { execution_backend: Option, next_worker_sequence: u64, next_event_id: u64, + #[cfg(feature = "fs-store")] + next_diagnostic_id: u64, workers: BTreeMap, config_bundles: BTreeMap, events: Vec, @@ -915,6 +922,8 @@ impl RuntimeState { execution_backend: None, next_worker_sequence: 1, next_event_id: 1, + #[cfg(feature = "fs-store")] + next_diagnostic_id: 1, workers: BTreeMap::new(), config_bundles: BTreeMap::new(), events: Vec::new(), @@ -945,6 +954,8 @@ impl RuntimeState { execution_backend: None, next_worker_sequence: 1, next_event_id: 1, + #[cfg(feature = "fs-store")] + next_diagnostic_id: 1, workers: BTreeMap::new(), config_bundles: BTreeMap::new(), events: Vec::new(), @@ -976,7 +987,28 @@ impl RuntimeState { } let mut workers = BTreeMap::new(); + let mut diagnostics = persisted.diagnostics; + let mut next_diagnostic_id = persisted.next_diagnostic_id; for (worker_id, worker) in persisted.workers { + let execution = if worker.execution.binding.is_some() + && worker.execution.backend == WorkerExecutionBackendKind::Connected + { + let stale = WorkerExecutionStatus::stale(worker.execution); + diagnostics.push(RuntimeDiagnostic { + id: next_diagnostic_id, + severity: DiagnosticSeverity::Warning, + code: "worker_execution_mapping_stale".to_string(), + message: format!( + "worker {} has persisted execution binding identity but no live execution handle was restored", + worker.worker_id + ), + worker_ref: Some(worker.worker_ref.clone()), + }); + next_diagnostic_id += 1; + stale + } else { + worker.execution + }; workers.insert( worker_id, WorkerRecord { @@ -984,7 +1016,7 @@ impl RuntimeState { worker_id: worker.worker_id, status: worker.status, request: worker.request, - execution: WorkerExecutionStatus::unconnected(), + execution, execution_handle: None, transcript: worker.transcript, next_transcript_sequence: worker.next_transcript_sequence, @@ -1003,11 +1035,11 @@ impl RuntimeState { execution_backend: None, next_worker_sequence: persisted.next_worker_sequence, next_event_id: persisted.next_event_id, - next_diagnostic_id: persisted.next_diagnostic_id, + next_diagnostic_id, workers, config_bundles: persisted.config_bundles, events: persisted.events, - diagnostics: persisted.diagnostics, + diagnostics, #[cfg(feature = "ws-server")] next_observation_sequence: 1, #[cfg(feature = "ws-server")] @@ -1471,6 +1503,7 @@ impl WorkerRecord { worker_id: self.worker_id.clone(), status: self.status, request: self.request.clone(), + execution: self.execution.clone(), transcript: self.transcript.clone(), next_transcript_sequence: self.next_transcript_sequence, last_event_id: self.last_event_id, @@ -1498,6 +1531,11 @@ fn validate_create_worker_request(request: &CreateWorkerRequest) -> Result<(), R )); } if let Some(input) = &request.initial_input { + if input.kind != WorkerInputKind::User { + return Err(RuntimeError::InvalidInitialInputKind { + kind: format!("{:?}", input.kind), + }); + } if input.content.trim().is_empty() { return Err(RuntimeError::InvalidRequest( "initial_input.content must not be empty".to_string(), @@ -1768,6 +1806,29 @@ mod tests { assert!(matches!(err, RuntimeError::WrongRuntime { .. })); } + #[test] + fn create_worker_rejects_system_initial_input_without_persisting_worker() { + let runtime = runtime_with_backend(); + let mut request = task_request("system initial input"); + request.initial_input = Some(WorkerInput::system("role/system belongs in config bundle")); + + let error = runtime.create_worker(request).unwrap_err(); + assert!(matches!( + error, + RuntimeError::InvalidInitialInputKind { .. } + )); + assert!(runtime.list_workers().unwrap().is_empty()); + let events = runtime + .read_events(&runtime.event_cursor_from_start().unwrap(), 16) + .unwrap(); + assert!( + events + .events + .iter() + .all(|event| event.kind != RuntimeEventKind::WorkerCreated) + ); + } + #[test] fn create_worker_without_execution_backend_is_rejected_and_not_persisted() { let runtime = Runtime::new_memory(); @@ -2117,6 +2178,7 @@ mod tests { runtime.summary().unwrap().backend, RuntimeBackendKind::FsStore ); + runtime.store_config_bundle(test_bundle()).unwrap(); let worker = runtime.create_worker(task_request("persist me")).unwrap(); runtime @@ -2140,6 +2202,28 @@ mod tests { .unwrap(); let restored_worker = restored.worker_detail(&worker.worker_ref).unwrap(); assert_eq!(restored_worker.status, WorkerStatus::Stopped); + assert_eq!( + restored_worker.execution.backend, + WorkerExecutionBackendKind::Stale + ); + assert_eq!( + restored_worker + .execution + .binding + .as_ref() + .map(|binding| binding.backend_id.as_str()), + Some("test-execution-backend") + ); + assert!( + restored + .diagnostics() + .unwrap() + .iter() + .any( + |diagnostic| diagnostic.code == "worker_execution_mapping_stale" + && diagnostic.worker_ref.as_ref() == Some(&worker.worker_ref) + ) + ); assert_eq!(restored_worker.transcript_len, 2); let projection = restored @@ -2215,13 +2299,17 @@ mod tests { let missing_root = fs_store_root("missing"); let missing_runtime_id = RuntimeId::new("runtime-missing").unwrap(); - let missing_runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions { - root: missing_root.clone(), - runtime_id: Some(missing_runtime_id.clone()), - display_name: None, - limits: RuntimeLimits::default(), - }) + let missing_runtime = Runtime::with_fs_store_and_execution_backend( + crate::fs_store::FsRuntimeStoreOptions { + root: missing_root.clone(), + runtime_id: Some(missing_runtime_id.clone()), + display_name: None, + limits: RuntimeLimits::default(), + }, + Arc::new(TestExecutionBackend::default()), + ) .unwrap(); + missing_runtime.store_config_bundle(test_bundle()).unwrap(); missing_runtime .create_worker(task_request("missing worker snapshot")) .unwrap(); diff --git a/crates/workspace-server/src/hosts.rs b/crates/workspace-server/src/hosts.rs index 955539d3..2a022a67 100644 --- a/crates/workspace-server/src/hosts.rs +++ b/crates/workspace-server/src/hosts.rs @@ -2356,6 +2356,11 @@ fn embedded_runtime_diagnostic(error: &EmbeddedRuntimeError) -> RuntimeDiagnosti DiagnosticSeverity::Warning, format!("Requested limit {requested} exceeds embedded Runtime maximum {max}"), ), + EmbeddedRuntimeError::InvalidInitialInputKind { .. } => diagnostic( + "embedded_worker_initial_input_kind_invalid", + DiagnosticSeverity::Warning, + error.to_string(), + ), EmbeddedRuntimeError::InvalidRequest(_) | EmbeddedRuntimeError::ConfigBundleMissing { .. } | EmbeddedRuntimeError::ConfigBundleDigestMismatch { .. } @@ -3000,6 +3005,31 @@ mod tests { assert!(spawned.worker.is_none()); } + #[test] + fn embedded_runtime_rejects_system_initial_input_without_worker_projection() { + let runtime = EmbeddedWorkerRuntime::new_memory_with_execution_backend( + "local:test", + Arc::new(AcceptingExecutionBackend::default()), + ) + .expect("test backend should connect"); + let mut request = embedded_spawn_request(); + request.initial_input = Some(EmbeddedWorkerInput { + kind: EmbeddedWorkerInputKind::System, + content: "system/role instruction belongs in profile".to_string(), + }); + + let spawned = runtime.spawn_worker(request); + assert_eq!(spawned.state, WorkerOperationState::Rejected); + assert!(spawned.worker.is_none()); + assert!(spawned.diagnostics.iter().any(|diagnostic| { + diagnostic.code == "embedded_worker_initial_input_kind_invalid" + && diagnostic + .message + .contains("initial worker input must be user input") + })); + assert!(runtime.list_workers(10).items.is_empty()); + } + #[test] fn embedded_runtime_with_execution_backend_routes_input_and_projects_transcript() { let runtime = EmbeddedWorkerRuntime::new_memory_with_execution_backend(